diff --git a/src/lerobot/configs/default.py b/src/lerobot/configs/default.py index 58ed64420..38039a7bf 100644 --- a/src/lerobot/configs/default.py +++ b/src/lerobot/configs/default.py @@ -67,7 +67,8 @@ class EvalConfig: # `batch_size` specifies the number of environments to use in a gym.vector.VectorEnv. batch_size: int = 50 # `use_async_envs` specifies whether to use asynchronous environments (multiprocessing). - use_async_envs: bool = False + # Defaults to True; automatically downgraded to SyncVectorEnv when batch_size=1. + use_async_envs: bool = True def __post_init__(self) -> None: if self.batch_size > self.n_episodes: diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index 750187c05..d89b829ae 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -75,13 +75,14 @@ class EnvConfig(draccus.ChoiceRegistry, abc.ABC): def create_envs( self, n_envs: int, - use_async_envs: bool = False, + use_async_envs: bool = True, ) -> dict[str, dict[int, gym.vector.VectorEnv]]: """Create {suite: {task_id: VectorEnv}}. Default: single-task env via gym.make(). Multi-task benchmarks override. + AsyncVectorEnv is the default for n_envs > 1; auto-downgraded to Sync for n_envs=1. """ - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv if self.gym_id not in gym_registry: print(f"gym id '{self.gym_id}' not found, attempting to import '{self.package_name}'...") @@ -399,12 +400,12 @@ class LiberoEnv(EnvConfig): kwargs["task_ids"] = self.task_ids return kwargs - def create_envs(self, n_envs: int, use_async_envs: bool = False): + def create_envs(self, n_envs: int, use_async_envs: bool = True): from lerobot.envs.libero import create_libero_envs if self.task is None: raise ValueError("LiberoEnv requires a task to be specified") - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv return create_libero_envs( task=self.task, n_envs=n_envs, @@ -468,12 +469,12 @@ class MetaworldEnv(EnvConfig): "render_mode": self.render_mode, } - def create_envs(self, n_envs: int, use_async_envs: bool = False): + def create_envs(self, n_envs: int, use_async_envs: bool = True): from lerobot.envs.metaworld import create_metaworld_envs if self.task is None: raise ValueError("MetaWorld requires a task to be specified") - env_cls = gym.vector.AsyncVectorEnv if use_async_envs else gym.vector.SyncVectorEnv + env_cls = gym.vector.AsyncVectorEnv if (use_async_envs and n_envs > 1) else gym.vector.SyncVectorEnv return create_metaworld_envs( task=self.task, n_envs=n_envs, diff --git a/src/lerobot/envs/factory.py b/src/lerobot/envs/factory.py index 40d5425cc..d349fef49 100644 --- a/src/lerobot/envs/factory.py +++ b/src/lerobot/envs/factory.py @@ -58,7 +58,7 @@ def make_env_pre_post_processors( def make_env( cfg: EnvConfig | str, n_envs: int = 1, - use_async_envs: bool = False, + use_async_envs: bool = True, hub_cache_dir: str | None = None, trust_remote_code: bool = False, ) -> dict[str, dict[int, gym.vector.VectorEnv]]: diff --git a/src/lerobot/envs/libero.py b/src/lerobot/envs/libero.py index 8ddb4b68c..634dac799 100644 --- a/src/lerobot/envs/libero.py +++ b/src/lerobot/envs/libero.py @@ -150,7 +150,17 @@ class LiberoEnv(gym.Env): self.init_state_id = self.episode_index # tie each sub-env to a fixed init state - self._env = self._make_envs_task(task_suite, self.task_id) + # Extract task metadata without allocating GPU resources (safe before fork). + task = task_suite.get_task(task_id) + self.task = task.name + self.task_description = task.language + self._task_bddl_file = os.path.join( + get_libero_path("bddl_files"), task.problem_folder, task.bddl_file + ) + self._env: OffScreenRenderEnv | None = ( + None # deferred — created on first reset() inside the worker subprocess + ) + default_steps = 500 self._max_episode_steps = ( TASK_SUITE_MAX_STEPS.get(task_suite_name, default_steps) @@ -221,29 +231,33 @@ class LiberoEnv(gym.Env): low=ACTION_LOW, high=ACTION_HIGH, shape=(ACTION_DIM,), dtype=np.float32 ) + def _ensure_env(self) -> None: + """Create the underlying OffScreenRenderEnv on first use. + + Called inside the worker subprocess after fork(), so each worker gets + its own clean EGL context rather than inheriting a stale one from the + parent process (which causes EGL_BAD_CONTEXT crashes with AsyncVectorEnv). + """ + if self._env is not None: + return + env = OffScreenRenderEnv( + bddl_file_name=self._task_bddl_file, + camera_heights=self.observation_height, + camera_widths=self.observation_width, + ) + env.reset() + self._env = env + def render(self): + self._ensure_env() raw_obs = self._env.env._get_observations() pixels = self._format_raw_obs(raw_obs)["pixels"] image = next(iter(pixels.values())) image = image[::-1, ::-1] # flip both H and W for visualization return image - def _make_envs_task(self, task_suite: Any, task_id: int = 0): - task = task_suite.get_task(task_id) - self.task = task.name - self.task_description = task.language - task_bddl_file = os.path.join(get_libero_path("bddl_files"), task.problem_folder, task.bddl_file) - - env_args = { - "bddl_file_name": task_bddl_file, - "camera_heights": self.observation_height, - "camera_widths": self.observation_width, - } - env = OffScreenRenderEnv(**env_args) - env.reset() - return env - def _format_raw_obs(self, raw_obs: RobotObservation) -> RobotObservation: + assert self._env is not None, "_format_raw_obs called before _ensure_env()" images = {} for camera_name in self.camera_name: image = raw_obs[camera_name] @@ -295,6 +309,7 @@ class LiberoEnv(gym.Env): ) def reset(self, seed=None, **kwargs): + self._ensure_env() super().reset(seed=seed) self._env.seed(seed) raw_obs = self._env.reset() @@ -321,6 +336,8 @@ class LiberoEnv(gym.Env): return observation, info def step(self, action: np.ndarray) -> tuple[RobotObservation, float, bool, bool, dict[str, Any]]: + self._ensure_env() + assert self._env is not None if action.ndim != 1: raise ValueError( f"Expected action to be 1-D (shape (action_dim,)), " @@ -345,7 +362,8 @@ class LiberoEnv(gym.Env): return observation, reward, terminated, truncated, info def close(self): - self._env.close() + if self._env is not None: + self._env.close() def _make_env_fns( diff --git a/src/lerobot/envs/metaworld.py b/src/lerobot/envs/metaworld.py index e9e29f304..273251312 100644 --- a/src/lerobot/envs/metaworld.py +++ b/src/lerobot/envs/metaworld.py @@ -97,8 +97,9 @@ class MetaworldEnv(gym.Env): self.visualization_height = visualization_height self.camera_name = camera_name - self._env = self._make_envs_task(self.task) - self._max_episode_steps = self._env.max_path_length + self._env_name = self.task # already stripped of "metaworld-" prefix above + self._env = None # deferred — created on first reset() inside the worker subprocess + self._max_episode_steps = 500 # MT1 environments always have max_path_length=500 self.task_description = TASK_DESCRIPTIONS[self.task] self.expert_policy = TASK_POLICY_MAPPING[self.task]() @@ -136,6 +137,24 @@ class MetaworldEnv(gym.Env): self.action_space = spaces.Box(low=-1, high=1, shape=(ACTION_DIM,), dtype=np.float32) + def _ensure_env(self) -> None: + """Create the underlying MetaWorld env on first use. + + Called inside the worker subprocess after fork(), so each worker gets + its own clean rendering context rather than inheriting a stale one from + the parent process (which causes crashes with AsyncVectorEnv). + """ + if self._env is not None: + return + mt1 = metaworld.MT1(self._env_name, seed=42) + env = mt1.train_classes[self._env_name](render_mode="rgb_array", camera_name=self.camera_name) + env.set_task(mt1.train_tasks[0]) + if self.camera_name == "corner2": + env.model.cam_pos[2] = [0.75, 0.075, 0.7] + env.reset() + env._freeze_rand_vec = False # otherwise no randomization + self._env = env + def render(self) -> np.ndarray: """ Render the current environment frame. @@ -143,26 +162,13 @@ class MetaworldEnv(gym.Env): Returns: np.ndarray: The rendered RGB image from the environment. """ + self._ensure_env() image = self._env.render() if self.camera_name == "corner2": # Images from this camera are flipped — correct them image = np.flip(image, (0, 1)) return image - def _make_envs_task(self, env_name: str): - mt1 = metaworld.MT1(env_name, seed=42) - env = mt1.train_classes[env_name](render_mode="rgb_array", camera_name=self.camera_name) - env.set_task(mt1.train_tasks[0]) - if self.camera_name == "corner2": - env.model.cam_pos[2] = [ - 0.75, - 0.075, - 0.7, - ] # corner2 position, similar to https://arxiv.org/pdf/2206.14244 - env.reset() - env._freeze_rand_vec = False # otherwise no randomization - return env - def _format_raw_obs(self, raw_obs: np.ndarray) -> RobotObservation: image = None if self._env is not None: @@ -209,6 +215,7 @@ class MetaworldEnv(gym.Env): observation (RobotObservation): The initial formatted observation. info (Dict[str, Any]): Additional info about the reset state. """ + self._ensure_env() super().reset(seed=seed) raw_obs, info = self._env.reset(seed=seed) @@ -232,6 +239,7 @@ class MetaworldEnv(gym.Env): truncated (bool): Whether the episode was truncated due to a time limit. info (Dict[str, Any]): Additional environment info. """ + self._ensure_env() if action.ndim != 1: raise ValueError( f"Expected action to be 1-D (shape (action_dim,)), " @@ -263,7 +271,8 @@ class MetaworldEnv(gym.Env): return observation, reward, terminated, truncated, info def close(self): - self._env.close() + if self._env is not None: + self._env.close() # ---- Main API ---------------------------------------------------------------- diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index e2c23ab39..71f74dc2a 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -73,7 +73,6 @@ from lerobot.configs import parser from lerobot.configs.eval import EvalPipelineConfig from lerobot.envs.factory import make_env, make_env_pre_post_processors from lerobot.envs.utils import ( - add_envs_task, check_env_attributes_and_types, close_envs, preprocess_observation, @@ -166,9 +165,9 @@ def rollout( if return_observations: all_observations.append(deepcopy(observation)) - # Infer "task" from attributes of environments. - # TODO: works with SyncVectorEnv but not AsyncVectorEnv - observation = add_envs_task(env, observation) + # Infer "task" from sub-environments. + # env.call() works with both SyncVectorEnv and AsyncVectorEnv. + observation["task"] = env.call("task") # Apply environment-specific preprocessing (e.g., LiberoProcessorStep for LIBERO) observation = env_preprocessor(observation)