mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-31 19:01:28 +00:00
Several fixes to move the actor_server and learner_server code from the maniskill environment to the real robot environment.
Co-authored-by: Adil Zouitine <adilzouitinegm@gmail.com>
This commit is contained in:
@@ -31,16 +31,21 @@ from omegaconf import DictConfig
|
||||
from torch import nn
|
||||
|
||||
# TODO: Remove the import of maniskill
|
||||
from lerobot.common.envs.factory import make_maniskill_env
|
||||
from lerobot.common.envs.utils import preprocess_maniskill_observation
|
||||
# from lerobot.common.envs.factory import make_maniskill_env
|
||||
# from lerobot.common.envs.utils import preprocess_maniskill_observation
|
||||
from lerobot.common.policies.factory import make_policy
|
||||
from lerobot.common.policies.sac.modeling_sac import SACPolicy
|
||||
from lerobot.common.robot_devices.control_utils import busy_wait
|
||||
from lerobot.common.robot_devices.robots.factory import make_robot
|
||||
from lerobot.common.robot_devices.robots.utils import Robot
|
||||
from lerobot.common.utils.utils import (
|
||||
TimerManager,
|
||||
get_safe_torch_device,
|
||||
set_global_seed,
|
||||
)
|
||||
from lerobot.scripts.server import hilserl_pb2, hilserl_pb2_grpc
|
||||
from lerobot.scripts.server.buffer import Transition, move_state_dict_to_device, move_transition_to_device
|
||||
from lerobot.scripts.server.gym_manipulator import get_classifier, make_robot_env
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@@ -152,7 +157,15 @@ def serve_actor_service(port=50052):
|
||||
server.wait_for_termination()
|
||||
|
||||
|
||||
def act_with_policy(cfg: DictConfig):
|
||||
def update_policy_parameters(policy: SACPolicy, parameters_queue: queue.Queue, device):
|
||||
if not parameters_queue.empty():
|
||||
logging.debug("[ACTOR] Load new parameters from Learner.")
|
||||
state_dict = parameters_queue.get()
|
||||
state_dict = move_state_dict_to_device(state_dict, device=device)
|
||||
policy.load_state_dict(state_dict)
|
||||
|
||||
|
||||
def act_with_policy(cfg: DictConfig, robot: Robot, reward_classifier: nn.Module):
|
||||
"""
|
||||
Executes policy interaction within the environment.
|
||||
|
||||
@@ -165,9 +178,7 @@ def act_with_policy(cfg: DictConfig):
|
||||
|
||||
logging.info("make_env online")
|
||||
|
||||
# online_env = make_env(cfg, n_envs=1)
|
||||
# TODO: Remove the import of maniskill and unifiy with make env
|
||||
online_env = make_maniskill_env(cfg, n_envs=1)
|
||||
online_env = make_robot_env(robot=robot, reward_classifier=reward_classifier, cfg=cfg.env)
|
||||
|
||||
set_global_seed(cfg.seed)
|
||||
device = get_safe_torch_device(cfg.device, log=True)
|
||||
@@ -177,6 +188,16 @@ def act_with_policy(cfg: DictConfig):
|
||||
|
||||
logging.info("make_policy")
|
||||
|
||||
# HACK: This is an ugly hack to pass the normalization parameters to the policy
|
||||
# Because the action space is dynamic so we override the output normalization parameters
|
||||
# it's ugly, we know ... and we will fix it
|
||||
min_action_space: list = online_env.action_space.spaces[0].low.tolist()
|
||||
max_action_space: list = online_env.action_space.spaces[0].high.tolist()
|
||||
output_normalization_params: dict[dict[str, list]] = {
|
||||
"action": {"min": min_action_space, "max": max_action_space}
|
||||
}
|
||||
cfg.policy.output_normalization_params = output_normalization_params
|
||||
|
||||
### Instantiate the policy in both the actor and learner processes
|
||||
### To avoid sending a SACPolicy object through the port, we create a policy intance
|
||||
### on both sides, the learner sends the updated parameters every n steps to update the actor's parameters
|
||||
@@ -187,92 +208,41 @@ def act_with_policy(cfg: DictConfig):
|
||||
# Hack: But if we do online training, we do not need dataset_stats
|
||||
dataset_stats=None,
|
||||
# TODO: Handle resume training
|
||||
device=device,
|
||||
)
|
||||
# pretrained_policy_name_or_path=None,
|
||||
# device=device,
|
||||
# )
|
||||
policy = torch.compile(policy)
|
||||
assert isinstance(policy, nn.Module)
|
||||
|
||||
# HACK for maniskill
|
||||
obs, info = online_env.reset()
|
||||
|
||||
# obs = preprocess_observation(obs)
|
||||
obs = preprocess_maniskill_observation(obs)
|
||||
obs = {key: obs[key].to(device, non_blocking=True) for key in obs}
|
||||
|
||||
# NOTE: For the moment we will solely handle the case of a single environment
|
||||
sum_reward_episode = 0
|
||||
list_transition_to_send_to_learner = []
|
||||
list_policy_fps = []
|
||||
list_policy_time = []
|
||||
|
||||
for interaction_step in range(cfg.training.online_steps):
|
||||
if interaction_step >= cfg.training.online_step_before_learning:
|
||||
start = time.perf_counter()
|
||||
action = policy.select_action(batch=obs)
|
||||
list_policy_fps.append(1.0 / (time.perf_counter() - start + 1e-9))
|
||||
if list_policy_fps[-1] < cfg.fps:
|
||||
logging.warning(
|
||||
f"[ACTOR] policy frame rate {list_policy_fps[-1]} during interaction step {interaction_step} is below the required control frame rate {cfg.fps}"
|
||||
)
|
||||
# Time policy inference and check if it meets FPS requirement
|
||||
with TimerManager(
|
||||
elapsed_time_list=list_policy_time, label="Policy inference time", log=False
|
||||
) as timer: # noqa: F841
|
||||
action = policy.select_action(batch=obs) * 0.0
|
||||
policy_fps = 1.0 / (list_policy_time[-1] + 1e-9)
|
||||
|
||||
next_obs, reward, done, truncated, info = online_env.step(action.cpu().numpy())
|
||||
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
|
||||
|
||||
next_obs, reward, done, truncated, info = online_env.step(action.squeeze(dim=0).cpu().numpy())
|
||||
else:
|
||||
# TODO (azouitine): Make a custom space for torch tensor
|
||||
action = online_env.action_space.sample()
|
||||
next_obs, reward, done, truncated, info = online_env.step(action)
|
||||
# HACK
|
||||
action = torch.tensor(action, dtype=torch.float32).to(device, non_blocking=True)
|
||||
|
||||
# HACK: For maniskill
|
||||
# next_obs = preprocess_observation(next_obs)
|
||||
next_obs = preprocess_maniskill_observation(next_obs)
|
||||
next_obs = {key: next_obs[key].to(device, non_blocking=True) for key in obs}
|
||||
sum_reward_episode += float(reward[0])
|
||||
# HACK: We have only one env but we want to batch it, it will be resolved with the torch box
|
||||
action = torch.from_numpy(action[0]).to(device, non_blocking=True).unsqueeze(dim=0)
|
||||
|
||||
# Because we are using a single environment we can index at zero
|
||||
if done[0].item() or truncated[0].item():
|
||||
# TODO: Handle logging for episode information
|
||||
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
|
||||
sum_reward_episode += float(reward)
|
||||
|
||||
if not parameters_queue.empty():
|
||||
logging.debug("[ACTOR] Load new parameters from Learner.")
|
||||
state_dict = parameters_queue.get()
|
||||
state_dict = move_state_dict_to_device(state_dict, device=device)
|
||||
# strict=False for the case when the image encoder is frozen and not sent through
|
||||
# the network. Becareful might cause issues if the wrong keys are passed
|
||||
policy.actor.load_state_dict(state_dict, strict=False)
|
||||
|
||||
if len(list_transition_to_send_to_learner) > 0:
|
||||
logging.debug(
|
||||
f"[ACTOR] Sending {len(list_transition_to_send_to_learner)} transitions to Learner."
|
||||
)
|
||||
message_queue.put(ActorInformation(transition=list_transition_to_send_to_learner))
|
||||
list_transition_to_send_to_learner = []
|
||||
|
||||
stats = {}
|
||||
if len(list_policy_fps) > 0:
|
||||
policy_fps = mean(list_policy_fps)
|
||||
quantiles_90 = quantiles(list_policy_fps, n=10)[-1]
|
||||
logging.debug(f"[ACTOR] Average policy frame rate: {policy_fps}")
|
||||
logging.debug(f"[ACTOR] Policy frame rate 90th percentile: {quantiles_90}")
|
||||
stats = {"Policy frequency [Hz]": policy_fps, "Policy frequency 90th-p [Hz]": quantiles_90}
|
||||
list_policy_fps = []
|
||||
|
||||
# Send episodic reward to the learner
|
||||
message_queue.put(
|
||||
ActorInformation(
|
||||
interaction_message={
|
||||
"Episodic reward": sum_reward_episode,
|
||||
"Interaction step": interaction_step,
|
||||
**stats,
|
||||
}
|
||||
)
|
||||
)
|
||||
sum_reward_episode = 0.0
|
||||
|
||||
# TODO (michel-aractingi): Label the reward
|
||||
# if config.label_reward_on_actor:
|
||||
# reward = reward_classifier(obs)
|
||||
# NOTE: We overide the action if the intervention is True, because the action applied is the intervention action
|
||||
if info["is_intervention"]:
|
||||
# TODO: Check the shape
|
||||
action = info["action_intervention"]
|
||||
@@ -291,17 +261,85 @@ def act_with_policy(cfg: DictConfig):
|
||||
# assign obs to the next obs and continue the rollout
|
||||
obs = next_obs
|
||||
|
||||
# HACK: We have only one env but we want to batch it, it will be resolved with the torch box
|
||||
# Because we are using a single environment we can index at zero
|
||||
if done or truncated:
|
||||
# TODO: Handle logging for episode information
|
||||
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
|
||||
|
||||
# update_policy_parameters(policy=policy, parameters_queue=parameters_queue, device=device)
|
||||
|
||||
if len(list_transition_to_send_to_learner) > 0:
|
||||
send_transitions_in_chunks(
|
||||
transitions=list_transition_to_send_to_learner, message_queue=message_queue, chunk_size=4
|
||||
)
|
||||
list_transition_to_send_to_learner = []
|
||||
|
||||
stats = get_frequency_stats(list_policy_time)
|
||||
list_policy_time.clear()
|
||||
|
||||
# Send episodic reward to the learner
|
||||
message_queue.put(
|
||||
ActorInformation(
|
||||
interaction_message={
|
||||
"Episodic reward": sum_reward_episode,
|
||||
"Interaction step": interaction_step,
|
||||
**stats,
|
||||
}
|
||||
)
|
||||
)
|
||||
sum_reward_episode = 0.0
|
||||
obs, info = online_env.reset()
|
||||
|
||||
|
||||
def send_transitions_in_chunks(transitions: list, message_queue, chunk_size: int = 100):
|
||||
"""Send transitions to learner in smaller chunks to avoid network issues.
|
||||
|
||||
Args:
|
||||
transitions: List of transitions to send
|
||||
message_queue: Queue to send messages to learner
|
||||
chunk_size: Size of each chunk to send
|
||||
"""
|
||||
for i in range(0, len(transitions), chunk_size):
|
||||
chunk = transitions[i : i + chunk_size]
|
||||
logging.debug(f"[ACTOR] Sending chunk of {len(chunk)} transitions to Learner.")
|
||||
message_queue.put(ActorInformation(transition=chunk))
|
||||
|
||||
|
||||
def get_frequency_stats(list_policy_time: list[float]) -> dict[str, float]:
|
||||
stats = {}
|
||||
list_policy_fps = [1.0 / t for t in list_policy_time]
|
||||
if len(list_policy_fps) > 0:
|
||||
policy_fps = mean(list_policy_fps)
|
||||
quantiles_90 = quantiles(list_policy_fps, n=10)[-1]
|
||||
logging.debug(f"[ACTOR] Average policy frame rate: {policy_fps}")
|
||||
logging.debug(f"[ACTOR] Policy frame rate 90th percentile: {quantiles_90}")
|
||||
stats = {"Policy frequency [Hz]": policy_fps, "Policy frequency 90th-p [Hz]": quantiles_90}
|
||||
return stats
|
||||
|
||||
|
||||
def log_policy_frequency_issue(policy_fps: float, cfg: DictConfig, interaction_step: int):
|
||||
if policy_fps < cfg.fps:
|
||||
logging.warning(
|
||||
f"[ACTOR] Policy FPS {policy_fps:.1f} below required {cfg.fps} at step {interaction_step}"
|
||||
)
|
||||
|
||||
|
||||
@hydra.main(version_base="1.2", config_name="default", config_path="../../configs")
|
||||
def actor_cli(cfg: dict):
|
||||
port = cfg.actor_learner_config.port
|
||||
server_thread = Thread(target=serve_actor_service, args=(port,), daemon=True)
|
||||
server_thread.start()
|
||||
robot = make_robot(cfg=cfg.robot)
|
||||
|
||||
server_thread = Thread(target=serve_actor_service, args=(cfg.actor_learner_config.port,), daemon=True)
|
||||
reward_classifier = get_classifier(
|
||||
pretrained_path=cfg.env.reward_classifier.pretrained_path,
|
||||
config_path=cfg.env.reward_classifier.config_path,
|
||||
)
|
||||
policy_thread = Thread(
|
||||
target=act_with_policy,
|
||||
daemon=True,
|
||||
args=(cfg,),
|
||||
args=(cfg, robot, reward_classifier),
|
||||
)
|
||||
server_thread.start()
|
||||
policy_thread.start()
|
||||
policy_thread.join()
|
||||
server_thread.join()
|
||||
|
||||
Reference in New Issue
Block a user