refactor(robotwin): drop defensive dict guards, cache black fallback frame

_get_obs was guarding every dict access with isinstance(..., dict) in case
RoboTwin's get_obs returned something else — but the API contract
(envs/_base_task.py:437) always returns a dict, so the guards were silently
masking real failures behind plausible-looking zero observations. Drop them.

Also:
- Cache a single black fallback frame in __init__ instead of allocating
  a fresh np.zeros((H, W, 3), uint8) for every missing camera on every
  step — the "camera not exposed" set is static per env.
- Only allocate the zero joint_state on the fallback path (not unconditionally
  before the real value overwrites it).
- Replace .flatten() with .ravel() (no copy when already 1-D).
- Fold the nested-dict schema comment and two identical torch.enable_grad()
  rationales into a single Autograd section in the class docstring.
- Fix stale `left_wrist` camera name in the observation docstring.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-14 18:39:40 +02:00
parent f40b30202b
commit c2160ca86e

View File

@@ -206,12 +206,18 @@ class RoboTwinEnv(gym.Env):
Observations
------------
The ``pixels`` dict uses the raw RoboTwin camera names as keys (e.g.
``"head_camera"``, ``"left_wrist"``). ``preprocess_observation`` in
``"head_camera"``, ``"left_camera"``). ``preprocess_observation`` in
``envs/utils.py`` then converts these to ``observation.images.<cam>``.
Actions
-------
14-dim float32 array in ``[-1, 1]`` (joint-space, 7 DOF per arm).
Autograd
--------
``setup_demo`` and ``take_action`` drive CuRobo's Newton trajectory
optimizer, which calls ``cost.backward()`` internally. lerobot_eval wraps
the rollout in ``torch.no_grad()``, so both call sites re-enable grad.
"""
metadata = {"render_modes": ["rgb_array"], "render_fps": 25}
@@ -241,6 +247,7 @@ class RoboTwinEnv(gym.Env):
self._env: Any | None = None # deferred — created on first reset() inside worker
self._step_count: int = 0
self._black_frame = np.zeros((observation_height, observation_width, 3), dtype=np.uint8)
image_spaces = {
cam: spaces.Box(
@@ -276,36 +283,31 @@ class RoboTwinEnv(gym.Env):
def _get_obs(self) -> RobotObservation:
assert self._env is not None, "_get_obs called before _ensure_env()"
raw = self._env.get_obs()
cameras_raw = raw.get("observation", {})
# RoboTwin's get_obs returns a nested dict:
# {"observation": {cam_name: {"rgb": img, "intrinsic_matrix": ...}, ...},
# "joint_action": {"left_arm": [...], "left_gripper": ..., "vector": np.ndarray},
# "endpose": {...}}
cameras_raw = raw.get("observation", {}) if isinstance(raw, dict) else {}
images: dict[str, np.ndarray] = {}
for cam in self.camera_names:
cam_data = cameras_raw.get(cam)
img = cam_data.get("rgb") if isinstance(cam_data, dict) else None
if img is not None:
img = np.asarray(img, dtype=np.uint8)
if img.ndim == 2:
img = np.stack([img, img, img], axis=-1)
elif img.shape[-1] != 3:
img = img[..., :3]
images[cam] = img
else:
# Camera not exposed by this task — return a black frame so the
# observation shape stays consistent across all tasks.
images[cam] = np.zeros((self.observation_height, self.observation_width, 3), dtype=np.uint8)
img = cam_data.get("rgb") if cam_data else None
if img is None:
images[cam] = self._black_frame
continue
img = np.asarray(img, dtype=np.uint8)
if img.ndim == 2:
img = np.stack([img, img, img], axis=-1)
elif img.shape[-1] != 3:
img = img[..., :3]
images[cam] = img
# Joint state: RoboTwin exposes it as raw["joint_action"]["vector"]
# (flat array of left_arm+left_gripper+right_arm+right_gripper).
joint_state = np.zeros(ACTION_DIM, dtype=np.float64)
ja = raw.get("joint_action") if isinstance(raw, dict) else None
if isinstance(ja, dict) and "vector" in ja:
arr = np.asarray(ja["vector"], dtype=np.float64).flatten()
if arr.size >= ACTION_DIM:
joint_state = arr[:ACTION_DIM]
ja = raw.get("joint_action") or {}
vec = ja.get("vector")
if vec is not None:
arr = np.asarray(vec, dtype=np.float64).ravel()
joint_state = (
arr[:ACTION_DIM] if arr.size >= ACTION_DIM else np.zeros(ACTION_DIM, dtype=np.float64)
)
else:
joint_state = np.zeros(ACTION_DIM, dtype=np.float64)
return {"pixels": images, "agent_pos": joint_state}
@@ -317,9 +319,6 @@ class RoboTwinEnv(gym.Env):
actual_seed = self.episode_index if seed is None else seed
setup_kwargs = _load_robotwin_setup_kwargs(self.task_name)
setup_kwargs.update(seed=actual_seed, is_test=True)
# setup_demo → load_robot → CuroboPlanner.warmup() runs Newton's-method
# trajectory optimization, which requires autograd. lerobot_eval wraps
# the whole rollout in torch.no_grad(), so re-enable grad here.
with torch.enable_grad():
self._env.setup_demo(**setup_kwargs)
self.episode_index += self._reset_stride
@@ -333,9 +332,6 @@ class RoboTwinEnv(gym.Env):
if action.ndim != 1 or action.shape[0] != ACTION_DIM:
raise ValueError(f"Expected 1-D action of shape ({ACTION_DIM},), got {action.shape}")
# RoboTwin 2.0 uses take_action(); fall back to step() for older forks.
# take_action() invokes the CuRobo planner, which needs autograd —
# lerobot_eval wraps the rollout in torch.no_grad().
with torch.enable_grad():
if hasattr(self._env, "take_action"):
self._env.take_action(action)