Compare commits

...

7 Commits

Author SHA1 Message Date
Khalil Meftah
a0dc324b81 update close gripper button 2026-04-05 18:05:19 +02:00
Khalil Meftah
1d275e2021 change close gripper button 2026-04-05 18:00:43 +02:00
Khalil Meftah
24bb2cb0ff refactor: xbox gamepad buttons 2026-04-05 17:56:00 +02:00
Khalil Meftah
1d414c07e2 fix xbox gamepad 2026-04-01 10:59:40 +02:00
Khalil Meftah
e04e3399b9 fix normalizatiom 2026-03-25 19:26:41 +01:00
Jade Choghari
017ff73fbf chore(docs): add rename map and empty cam guide (#3065)
* add blog/guide

* add to tree

* chore(docs): rephrase rename_map docs for clarity and simplicity

---------

Co-authored-by: Steven Palma <steven.palma@huggingface.co>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-03-23 13:57:53 -07:00
Praedico
f90db58c15 docs(async): fix GitHub issues link (#3186) 2026-03-19 22:32:07 -07:00
7 changed files with 398 additions and 152 deletions

View File

@@ -19,6 +19,8 @@
title: Multi GPU training title: Multi GPU training
- local: peft_training - local: peft_training
title: Training with PEFT (e.g., LoRA) title: Training with PEFT (e.g., LoRA)
- local: rename_map
title: Using Rename Map and Empty Cameras
title: "Tutorials" title: "Tutorials"
- sections: - sections:
- local: lerobot-dataset-v3 - local: lerobot-dataset-v3

View File

@@ -310,4 +310,4 @@ Asynchronous inference represents a significant advancement in real-time robotic
- **Universal Compatibility**: Works with all LeRobot-supported policies, from lightweight ACT models to vision-language models like SmolVLA - **Universal Compatibility**: Works with all LeRobot-supported policies, from lightweight ACT models to vision-language models like SmolVLA
Start experimenting with the default parameters, monitor your action queue sizes, and iteratively refine your setup to achieve optimal performance for your specific use case. Start experimenting with the default parameters, monitor your action queue sizes, and iteratively refine your setup to achieve optimal performance for your specific use case.
If you want to discuss this further, hop into our [Discord community](https://discord.gg/s3KuuzsPFb), or open an issue on our [GitHub repository](https://github.com/lerobot/lerobot/issues). If you want to discuss this further, hop into our [Discord community](https://discord.gg/s3KuuzsPFb), or open an issue on our [GitHub repository](https://github.com/huggingface/lerobot/issues).

114
docs/source/rename_map.mdx Normal file
View File

@@ -0,0 +1,114 @@
# Rename Map and Empty Cameras
When you train, evaluate, or record with a robot policy, your **dataset** or **environment** provides observations under one set of keys (e.g. `observation.images.front`, `observation.images.eagle`), while your **policy** expects another (e.g. `observation.images.image`, `observation.images.image2`). The **rename map** bridges that gap without changing the policy or data source.
> **Scope:** The rename map only renames **observation** keys (images and state). Action keys are not affected.
## Why observation keys don't always match
Policies have a fixed set of **input feature names** baked into their pretrained config. For example:
- [pi0fast-libero](https://huggingface.co/lerobot/pi0fast-libero) expects `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb`.
- [xvla-base](https://huggingface.co/lerobot/xvla-base) expects `observation.images.image`, `observation.images.image2`, and `observation.images.image3`.
Your dataset might use different names entirely (e.g. `observation.images.front`, `observation.images.eagle`, `observation.images.glove`), and your eval environment might use yet another set. Rather than editing the policy config or renaming columns in the dataset, you pass a **rename map**: a JSON dictionary that maps source keys to the keys the policy expects. Renaming happens inside the preprocessor pipeline, so the policy always sees its expected keys.
## Using the rename map
Pass the mapping as a JSON string on the command line. The convention is always:
```
--rename_map='{"source_key": "policy_key", ...}'
```
where **source_key** is what the dataset or environment provides, and **policy_key** is what the policy expects.
Only listed keys are renamed; everything else passes through unchanged. Order of entries doesn't matter.
Supported policies: **PI0**, **PI05**, **PI0Fast**, **SmolVLA**, and **XVLA**.
### Training
Suppose you fine-tune [lerobot/xvla-base](https://huggingface.co/lerobot/xvla-base) on a dataset with images under `observation.images.front`, `observation.images.eagle`, and `observation.images.glove`. XVLA expects `observation.images.image`, `observation.images.image2`, and `observation.images.image3`:
```bash
lerobot-train \
--dataset.repo_id=YOUR_DATASET \
--output_dir=./outputs/xvla_training \
--job_name=xvla_training \
--policy.path="lerobot/xvla-base" \
--policy.repo_id="HF_USER/xvla-your-robot" \
--policy.dtype=bfloat16 \
--policy.action_mode=auto \
--steps=20000 \
--policy.device=cuda \
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true \
--rename_map='{"observation.images.front": "observation.images.image", "observation.images.eagle": "observation.images.image2", "observation.images.glove": "observation.images.image3"}'
```
### Evaluation
A policy that expects `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb` (e.g. [pi0fast-libero](https://huggingface.co/lerobot/pi0fast-libero)), but the LIBERO environment returns `observation.images.image` and `observation.images.image2`:
```bash
lerobot-eval \
--policy.path=lerobot/pi0fast-libero \
--env.type=libero \
... \
--rename_map='{"observation.images.image": "observation.images.base_0_rgb", "observation.images.image2": "observation.images.left_wrist_0_rgb"}'
```
### Recording
`lerobot-record` also supports rename maps, nested under the dataset config:
```bash
lerobot-record \ # When running inference
--policy.path="<user>/smolVLA_finetuned" \
... \
--dataset.rename_map='{"observation.images.glove2": "observation.images.image"}'
```
## Alternative: edit the policy config directly
If you always use the same dataset or environment, you can **edit the policy's `config.json`** so its observation keys match your data source. Then no rename map is needed.
The tradeoff: modifying the policy config ties it to one data source. A rename map keeps one policy usable across many datasets and environments.
## Empty cameras: fewer views than the policy expects
Some policies are built for a fixed number of image inputs. If your dataset has fewer cameras, you can set **`empty_cameras`** in the policy config instead of modifying the model architecture.
### How it works
Setting `empty_cameras=N` adds N placeholder image features to the policy config, named:
```
observation.images.empty_camera_0
observation.images.empty_camera_1
...
```
At runtime, these keys have no corresponding data in the batch. The policy fills them with masked dummy tensors (padded with `-1` for SigLIP-based vision encoders, with a zero attention mask), so the extra image slots are effectively ignored during training and inference.
### Example
XVLA-base has three visual inputs and `empty_cameras=0` by default. Your dataset only has two cameras:
1. Set `--policy.empty_cameras=1`.
2. The config adds a third key: `observation.images.empty_camera_0`.
3. Use the rename map for your two real cameras as usual.
4. The third slot is masked out — no fake images needed in your dataset.
## Quick reference
| Goal | What to do |
| ----------------------------------------- | --------------------------------------------------------------------------- |
| Dataset keys ≠ policy keys | `--rename_map='{"dataset_key": "policy_key", ...}'` |
| Env keys ≠ policy keys (eval) | `--rename_map='{"env_key": "policy_key", ...}'` |
| Recording with different keys (inference) | `--dataset.rename_map='{"source_key": "policy_key", ...}'`. |
| Fewer cameras than policy expects | `--policy.empty_cameras=N` (supported by PI0, PI05, PI0Fast, SmolVLA, XVLA) |
| Avoid passing a rename map | Edit the policy's `config.json` so its keys match your data source |

View File

@@ -131,6 +131,15 @@ class _NormalizationMixin:
if self.dtype is None: if self.dtype is None:
self.dtype = torch.float32 self.dtype = torch.float32
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
def _reshape_visual_stats(self) -> None:
"""Reshape visual stats from ``[C]`` to ``[C, 1, 1]`` for image broadcasting."""
for key, feature in self.features.items():
if feature.type == FeatureType.VISUAL and key in self._tensor_stats:
for stat_name, stat_tensor in self._tensor_stats[key].items():
if isinstance(stat_tensor, Tensor) and stat_tensor.ndim == 1:
self._tensor_stats[key][stat_name] = stat_tensor.reshape(-1, 1, 1)
def to( def to(
self, device: torch.device | str | None = None, dtype: torch.dtype | None = None self, device: torch.device | str | None = None, dtype: torch.dtype | None = None
@@ -149,6 +158,7 @@ class _NormalizationMixin:
if dtype is not None: if dtype is not None:
self.dtype = dtype self.dtype = dtype
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
return self return self
def state_dict(self) -> dict[str, Tensor]: def state_dict(self) -> dict[str, Tensor]:
@@ -198,6 +208,7 @@ class _NormalizationMixin:
# Don't load from state_dict, keep the explicitly provided stats # Don't load from state_dict, keep the explicitly provided stats
# But ensure _tensor_stats is properly initialized # But ensure _tensor_stats is properly initialized
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) # type: ignore[assignment] self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) # type: ignore[assignment]
self._reshape_visual_stats()
return return
# Normal behavior: load stats from state_dict # Normal behavior: load stats from state_dict
@@ -209,6 +220,8 @@ class _NormalizationMixin:
dtype=torch.float32, device=self.device dtype=torch.float32, device=self.device
) )
self._reshape_visual_stats()
# Reconstruct the original stats dict from tensor stats for compatibility with to() method # Reconstruct the original stats dict from tensor stats for compatibility with to() method
# and other functions that rely on self.stats # and other functions that rely on self.stats
self.stats = {} self.stats = {}

View File

@@ -62,6 +62,7 @@ from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies.factory import make_policy from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
from lerobot.rl.process import ProcessSignalHandler from lerobot.rl.process import ProcessSignalHandler
from lerobot.rl.queue import get_last_item_from_queue from lerobot.rl.queue import get_last_item_from_queue
from lerobot.robots import so_follower # noqa: F401 from lerobot.robots import so_follower # noqa: F401
@@ -258,6 +259,11 @@ def act_with_policy(
policy = policy.eval() policy = policy.eval()
assert isinstance(policy, nn.Module) assert isinstance(policy, nn.Module)
preprocessor, postprocessor = make_sac_pre_post_processors(
config=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
obs, info = online_env.reset() obs, info = online_env.reset()
env_processor.reset() env_processor.reset()
action_processor.reset() action_processor.reset()
@@ -289,7 +295,9 @@ def act_with_policy(
# Time policy inference and check if it meets FPS requirement # Time policy inference and check if it meets FPS requirement
with policy_timer: with policy_timer:
# Extract observation from transition for policy # Extract observation from transition for policy
action = policy.select_action(batch=observation) normalized_observation = preprocessor.process_observation(observation)
action = policy.select_action(batch=normalized_observation)
# action = postprocessor.process_action(action)
policy_fps = policy_timer.fps_last policy_fps = policy_timer.fps_last
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step) log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)

View File

@@ -66,6 +66,7 @@ from lerobot.datasets.factory import make_dataset
from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.factory import make_policy from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
from lerobot.rl.buffer import ReplayBuffer, concatenate_batch_transitions from lerobot.rl.buffer import ReplayBuffer, concatenate_batch_transitions
from lerobot.rl.process import ProcessSignalHandler from lerobot.rl.process import ProcessSignalHandler
from lerobot.rl.wandb_utils import WandBLogger from lerobot.rl.wandb_utils import WandBLogger
@@ -313,6 +314,11 @@ def add_actor_information_and_train(
assert isinstance(policy, nn.Module) assert isinstance(policy, nn.Module)
preprocessor, _ = make_sac_pre_post_processors(
config=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
policy.train() policy.train()
push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy) push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy)
@@ -408,6 +414,9 @@ def add_actor_information_and_train(
done = batch["done"] done = batch["done"]
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations) check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observations = preprocessor.process_observation(observations)
next_observations = preprocessor.process_observation(next_observations)
observation_features, next_observation_features = get_observation_features( observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations policy=policy, observations=observations, next_observations=next_observations
) )
@@ -467,6 +476,9 @@ def add_actor_information_and_train(
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations) check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observations = preprocessor.process_observation(observations)
next_observations = preprocessor.process_observation(next_observations)
observation_features, next_observation_features = get_observation_features( observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations policy=policy, observations=observations, next_observations=next_observations
) )

View File

@@ -23,65 +23,46 @@ class InputController:
"""Base class for input controllers that generate motion deltas.""" """Base class for input controllers that generate motion deltas."""
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0): def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0):
"""
Initialize the controller.
Args:
x_step_size: Base movement step size in meters
y_step_size: Base movement step size in meters
z_step_size: Base movement step size in meters
"""
self.x_step_size = x_step_size self.x_step_size = x_step_size
self.y_step_size = y_step_size self.y_step_size = y_step_size
self.z_step_size = z_step_size self.z_step_size = z_step_size
self.running = True self.running = True
self.episode_end_status = None # None, "success", or "failure" self.episode_end_status = None
self.intervention_flag = False self.intervention_flag = False
self.open_gripper_command = False self.open_gripper_command = False
self.close_gripper_command = False self.close_gripper_command = False
def start(self): def start(self):
"""Start the controller and initialize resources."""
pass pass
def stop(self): def stop(self):
"""Stop the controller and release resources.""" pass
def reset(self):
pass pass
def get_deltas(self): def get_deltas(self):
"""Get the current movement deltas (dx, dy, dz) in meters."""
return 0.0, 0.0, 0.0 return 0.0, 0.0, 0.0
def update(self): def update(self):
"""Update controller state - call this once per frame."""
pass pass
def __enter__(self): def __enter__(self):
"""Support for use in 'with' statements."""
self.start() self.start()
return self return self
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
"""Ensure resources are released when exiting 'with' block."""
self.stop() self.stop()
def get_episode_end_status(self): def get_episode_end_status(self):
"""
Get the current episode end status.
Returns:
None if episode should continue, "success" or "failure" otherwise
"""
status = self.episode_end_status status = self.episode_end_status
self.episode_end_status = None # Reset after reading self.episode_end_status = None
return status return status
def should_intervene(self): def should_intervene(self):
"""Return True if intervention flag was set."""
return self.intervention_flag return self.intervention_flag
def gripper_command(self): def gripper_command(self):
"""Return the current gripper command."""
if self.open_gripper_command == self.close_gripper_command: if self.open_gripper_command == self.close_gripper_command:
return "stay" return "stay"
elif self.open_gripper_command: elif self.open_gripper_command:
@@ -102,14 +83,14 @@ class KeyboardController(InputController):
"backward_y": False, "backward_y": False,
"forward_z": False, "forward_z": False,
"backward_z": False, "backward_z": False,
"quit": False,
"success": False, "success": False,
"failure": False, "failure": False,
"intervention": False,
"rerecord": False,
} }
self.listener = None self.listener = None
def start(self): def start(self):
"""Start the keyboard listener."""
from pynput import keyboard from pynput import keyboard
def on_press(key): def on_press(key):
@@ -126,16 +107,21 @@ class KeyboardController(InputController):
self.key_states["backward_z"] = True self.key_states["backward_z"] = True
elif key == keyboard.Key.shift_r: elif key == keyboard.Key.shift_r:
self.key_states["forward_z"] = True self.key_states["forward_z"] = True
elif key == keyboard.Key.esc: elif key == keyboard.Key.ctrl_r:
self.key_states["quit"] = True self.open_gripper_command = True
self.running = False elif key == keyboard.Key.ctrl_l:
return False self.close_gripper_command = True
elif key == keyboard.Key.enter: elif key == keyboard.Key.enter:
self.key_states["success"] = True self.key_states["success"] = True
self.episode_end_status = TeleopEvents.SUCCESS self.episode_end_status = TeleopEvents.SUCCESS
elif key == keyboard.Key.backspace: elif key == keyboard.Key.esc:
self.key_states["failure"] = True self.key_states["failure"] = True
self.episode_end_status = TeleopEvents.FAILURE self.episode_end_status = TeleopEvents.FAILURE
elif key == keyboard.Key.space:
self.key_states["intervention"] = not self.key_states["intervention"]
elif hasattr(key, "char") and key.char == "r":
self.key_states["rerecord"] = True
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
except AttributeError: except AttributeError:
pass pass
@@ -153,10 +139,10 @@ class KeyboardController(InputController):
self.key_states["backward_z"] = False self.key_states["backward_z"] = False
elif key == keyboard.Key.shift_r: elif key == keyboard.Key.shift_r:
self.key_states["forward_z"] = False self.key_states["forward_z"] = False
elif key == keyboard.Key.enter: elif key == keyboard.Key.ctrl_r:
self.key_states["success"] = False self.open_gripper_command = False
elif key == keyboard.Key.backspace: elif key == keyboard.Key.ctrl_l:
self.key_states["failure"] = False self.close_gripper_command = False
except AttributeError: except AttributeError:
pass pass
@@ -165,18 +151,18 @@ class KeyboardController(InputController):
print("Keyboard controls:") print("Keyboard controls:")
print(" Arrow keys: Move in X-Y plane") print(" Arrow keys: Move in X-Y plane")
print(" Shift and Shift_R: Move in Z axis") print(" Shift / Shift_R: Move in Z axis")
print(" Ctrl_R / Ctrl_L: Open / Close gripper")
print(" Space: Toggle intervention")
print(" Enter: End episode with SUCCESS") print(" Enter: End episode with SUCCESS")
print(" Backspace: End episode with FAILURE") print(" Esc: End episode with FAILURE")
print(" ESC: Exit") print(" R: Rerecord episode")
def stop(self): def stop(self):
"""Stop the keyboard listener."""
if self.listener and self.listener.is_alive(): if self.listener and self.listener.is_alive():
self.listener.stop() self.listener.stop()
def get_deltas(self): def get_deltas(self):
"""Get the current movement deltas from keyboard state."""
delta_x = delta_y = delta_z = 0.0 delta_x = delta_y = delta_z = 0.0
if self.key_states["forward_x"]: if self.key_states["forward_x"]:
@@ -194,18 +180,58 @@ class KeyboardController(InputController):
return delta_x, delta_y, delta_z return delta_x, delta_y, delta_z
def should_intervene(self):
return self.key_states["intervention"]
def reset(self):
for key in self.key_states:
self.key_states[key] = False
class GamepadController(InputController): class GamepadController(InputController):
"""Generate motion deltas from gamepad input.""" """Generate motion deltas from gamepad input using pygame.
Matches gym-hil button/axis conventions for Linux gamepads, including
Xbox mappings.
"""
# Face buttons (same across most controllers on Linux)
BUTTON_A = 0
BUTTON_B = 1
BUTTON_X = 2
BUTTON_Y = 3
BUTTON_LB = 4
BUTTON_RB = 5
# Stick axes
AXIS_LEFT_X = 0
AXIS_LEFT_Y = 1
AXIS_RIGHT_X = 2
AXIS_RIGHT_Y = 3
# Default trigger buttons
BUTTON_LT = 6
BUTTON_RT = 7
# Xbox (gym-hil mapping on Linux)
XBOX_BUTTON_LT = 9
XBOX_BUTTON_RT = 10
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0, deadzone=0.1): def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0, deadzone=0.1):
super().__init__(x_step_size, y_step_size, z_step_size) super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone self.deadzone = deadzone
self.joystick = None self.joystick = None
self.intervention_flag = False self.intervention_flag = False
self.is_xbox = False
self._xbox360_profile = False
self._invert_left_x = False
self._invert_left_y = True
self._invert_right_y = True
def _detect_xbox(self, name):
name_lower = name.lower()
return any(tag in name_lower for tag in ["xbox", "microsoft", "x-box"])
def start(self): def start(self):
"""Initialize pygame and the gamepad."""
import pygame import pygame
pygame.init() pygame.init()
@@ -218,18 +244,35 @@ class GamepadController(InputController):
self.joystick = pygame.joystick.Joystick(0) self.joystick = pygame.joystick.Joystick(0)
self.joystick.init() self.joystick.init()
logging.info(f"Initialized gamepad: {self.joystick.get_name()}") joystick_name = self.joystick.get_name()
self.is_xbox = self._detect_xbox(joystick_name)
self._xbox360_profile = joystick_name == "Xbox 360 Controller"
if self._xbox360_profile:
# gym-hil "Xbox 360 Controller" profile
self.AXIS_RIGHT_X = 3
self.AXIS_RIGHT_Y = 4
self.BUTTON_LT = self.XBOX_BUTTON_LT
self.BUTTON_RT = self.XBOX_BUTTON_RT
self._invert_left_x = True
else:
# gym-hil default profile
self.AXIS_RIGHT_X = 2
self.AXIS_RIGHT_Y = 3
self.BUTTON_LT = 6
self.BUTTON_RT = 7
self._invert_left_x = False
logging.info(f"Initialized gamepad: {joystick_name} (xbox={self.is_xbox})")
print("Gamepad controls:") print("Gamepad controls:")
print(" Left analog stick: Move in X-Y plane") print(" Left analog stick: Move in X-Y plane")
print(" Right analog stick (vertical): Move in Z axis") print(" Right analog stick (vertical): Move in Z axis")
print(" B/Circle button: Exit") print(" RB: Intervention toggle")
print(" Y/Triangle button: End episode with SUCCESS") print(" LT / RT: Close / Open gripper")
print(" A/Cross button: End episode with FAILURE") print(" Y: End episode with SUCCESS")
print(" X/Square button: Rerecord episode") print(" A: End episode with FAILURE")
print(" X: Rerecord episode")
def stop(self): def stop(self):
"""Clean up pygame resources."""
import pygame import pygame
if pygame.joystick.get_init(): if pygame.joystick.get_init():
@@ -239,67 +282,56 @@ class GamepadController(InputController):
pygame.quit() pygame.quit()
def update(self): def update(self):
"""Process pygame events to get fresh gamepad readings."""
import pygame import pygame
for event in pygame.event.get(): for event in pygame.event.get():
if event.type == pygame.JOYBUTTONDOWN: if event.type == pygame.JOYBUTTONDOWN:
if event.button == 3: if event.button == self.BUTTON_Y:
self.episode_end_status = TeleopEvents.SUCCESS self.episode_end_status = TeleopEvents.SUCCESS
# A button (1) for failure elif event.button == self.BUTTON_A:
elif event.button == 1:
self.episode_end_status = TeleopEvents.FAILURE self.episode_end_status = TeleopEvents.FAILURE
# X button (0) for rerecord elif event.button == self.BUTTON_X:
elif event.button == 0:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE self.episode_end_status = TeleopEvents.RERECORD_EPISODE
elif event.button == self.BUTTON_LT:
# RB button (6) for closing gripper
elif event.button == 6:
self.close_gripper_command = True self.close_gripper_command = True
elif event.button == self.BUTTON_RT:
# LT button (7) for opening gripper
elif event.button == 7:
self.open_gripper_command = True self.open_gripper_command = True
# Reset episode status on button release
elif event.type == pygame.JOYBUTTONUP: elif event.type == pygame.JOYBUTTONUP:
if event.button in [0, 2, 3]: if event.button in [self.BUTTON_Y, self.BUTTON_A, self.BUTTON_X]:
self.episode_end_status = None self.episode_end_status = None
elif event.button == self.BUTTON_LT:
elif event.button == 6:
self.close_gripper_command = False self.close_gripper_command = False
elif event.button == self.BUTTON_RT:
elif event.button == 7:
self.open_gripper_command = False self.open_gripper_command = False
# Check for RB button (typically button 5) for intervention flag if self.joystick.get_button(self.BUTTON_RB):
if self.joystick.get_button(5):
self.intervention_flag = True self.intervention_flag = True
else: else:
self.intervention_flag = False self.intervention_flag = False
def get_deltas(self): def get_deltas(self):
"""Get the current movement deltas from gamepad state."""
import pygame import pygame
try: try:
# Read joystick axes x_input = self.joystick.get_axis(self.AXIS_LEFT_X)
# Left stick X and Y (typically axes 0 and 1) y_input = self.joystick.get_axis(self.AXIS_LEFT_Y)
y_input = self.joystick.get_axis(0) # Up/Down (often inverted) z_input = self.joystick.get_axis(self.AXIS_RIGHT_Y)
x_input = self.joystick.get_axis(1) # Left/Right
# Right stick Y (typically axis 3 or 4)
z_input = self.joystick.get_axis(3) # Up/Down for Z
# Apply deadzone to avoid drift
x_input = 0 if abs(x_input) < self.deadzone else x_input x_input = 0 if abs(x_input) < self.deadzone else x_input
y_input = 0 if abs(y_input) < self.deadzone else y_input y_input = 0 if abs(y_input) < self.deadzone else y_input
z_input = 0 if abs(z_input) < self.deadzone else z_input z_input = 0 if abs(z_input) < self.deadzone else z_input
# Calculate deltas (note: may need to invert axes depending on controller) if self._invert_left_x:
delta_x = -x_input * self.x_step_size # Forward/backward x_input = -x_input
delta_y = -y_input * self.y_step_size # Left/right if self._invert_left_y:
delta_z = -z_input * self.z_step_size # Up/down y_input = -y_input
if self._invert_right_y:
z_input = -z_input
delta_x = y_input * self.y_step_size
delta_y = x_input * self.x_step_size
delta_z = z_input * self.z_step_size
return delta_x, delta_y, delta_z return delta_x, delta_y, delta_z
@@ -309,7 +341,15 @@ class GamepadController(InputController):
class GamepadControllerHID(InputController): class GamepadControllerHID(InputController):
"""Generate motion deltas from gamepad input using HIDAPI.""" """Generate motion deltas from gamepad input using HIDAPI.
Supports auto-detection of controller type for correct HID report parsing.
Currently supported: Logitech RumblePad 2, 8BitDo Ultimate 2C Wireless.
"""
CONTROLLER_LOGITECH = "logitech"
CONTROLLER_8BITDO = "8bitdo"
CONTROLLER_UNKNOWN = "unknown"
def __init__( def __init__(
self, self,
@@ -318,36 +358,26 @@ class GamepadControllerHID(InputController):
z_step_size=1.0, z_step_size=1.0,
deadzone=0.1, deadzone=0.1,
): ):
"""
Initialize the HID gamepad controller.
Args:
step_size: Base movement step size in meters
z_scale: Scaling factor for Z-axis movement
deadzone: Joystick deadzone to prevent drift
"""
super().__init__(x_step_size, y_step_size, z_step_size) super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone self.deadzone = deadzone
self.device = None self.device = None
self.device_info = None self.device_info = None
self.controller_type = self.CONTROLLER_UNKNOWN
# Movement values (normalized from -1.0 to 1.0)
self.left_x = 0.0 self.left_x = 0.0
self.left_y = 0.0 self.left_y = 0.0
self.right_x = 0.0 self.right_x = 0.0
self.right_y = 0.0 self.right_y = 0.0
# Button states
self.buttons = {} self.buttons = {}
def find_device(self): def find_device(self):
"""Look for the gamepad device by vendor and product ID."""
import hid import hid
devices = hid.enumerate() devices = hid.enumerate()
for device in devices: for device in devices:
device_name = device["product_string"] device_name = device["product_string"]
if any(controller in device_name for controller in ["Logitech", "Xbox", "PS4", "PS5"]): if any(controller in device_name for controller in ["Logitech", "Xbox", "PS4", "PS5", "8BitDo"]):
return device return device
logging.error( logging.error(
@@ -355,8 +385,15 @@ class GamepadControllerHID(InputController):
) )
return None return None
def _detect_controller_type(self, product_string):
product = product_string.lower() if product_string else ""
if "8bitdo" in product:
return self.CONTROLLER_8BITDO
elif "logitech" in product:
return self.CONTROLLER_LOGITECH
return self.CONTROLLER_UNKNOWN
def start(self): def start(self):
"""Connect to the gamepad using HIDAPI."""
import hid import hid
self.device_info = self.find_device() self.device_info = self.find_device()
@@ -374,12 +411,22 @@ class GamepadControllerHID(InputController):
product = self.device.get_product_string() product = self.device.get_product_string()
logging.info(f"Connected to {manufacturer} {product}") logging.info(f"Connected to {manufacturer} {product}")
logging.info("Gamepad controls (HID mode):") self.controller_type = self._detect_controller_type(product)
logging.info(" Left analog stick: Move in X-Y plane") logging.info(f"Detected controller type: {self.controller_type}")
logging.info(" Right analog stick: Move in Z axis (vertical)")
logging.info(" Button 1/B/Circle: Exit") print("Gamepad controls (HID mode):")
logging.info(" Button 2/A/Cross: End episode with SUCCESS") print(" Left analog stick: Move in X-Y plane")
logging.info(" Button 3/X/Square: End episode with FAILURE") print(" Right analog stick: Move in Z axis (vertical)")
print(" RB: Intervention toggle")
if self.controller_type == self.CONTROLLER_8BITDO:
print(" L3 (left stick click): Close gripper")
print(" R3 (right stick click): Open gripper")
else:
print(" LT: Close gripper")
print(" RT: Open gripper")
print(" Y: End episode with SUCCESS")
print(" X: End episode with FAILURE")
print(" A: Rerecord episode")
except OSError as e: except OSError as e:
logging.error(f"Error opening gamepad: {e}") logging.error(f"Error opening gamepad: {e}")
@@ -387,74 +434,124 @@ class GamepadControllerHID(InputController):
self.running = False self.running = False
def stop(self): def stop(self):
"""Close the HID device connection."""
if self.device: if self.device:
self.device.close() self.device.close()
self.device = None self.device = None
def update(self): def update(self):
""" """Read the device several times to drain the HID buffer and get a stable reading."""
Read and process the latest gamepad data.
Due to an issue with the HIDAPI, we need to read the read the device several times in order to get a stable reading
"""
for _ in range(10): for _ in range(10):
self._update() self._update()
def _update(self): def _update(self):
"""Read and process the latest gamepad data."""
if not self.device or not self.running: if not self.device or not self.running:
return return
try: try:
# Read data from the gamepad
data = self.device.read(64) data = self.device.read(64)
# Interpret gamepad data - this will vary by controller model if not data:
# These offsets are for the Logitech RumblePad 2 return
if data and len(data) >= 8:
# Normalize joystick values from 0-255 to -1.0-1.0
self.left_y = (data[1] - 128) / 128.0
self.left_x = (data[2] - 128) / 128.0
self.right_x = (data[3] - 128) / 128.0
self.right_y = (data[4] - 128) / 128.0
# Apply deadzone if self.controller_type == self.CONTROLLER_8BITDO:
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y self._parse_8bitdo(data)
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x else:
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x self._parse_logitech(data)
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
# Parse button states (byte 5 in the Logitech RumblePad 2)
buttons = data[5]
# Check if RB is pressed then the intervention flag should be set
self.intervention_flag = data[6] in [2, 6, 10, 14]
# Check if RT is pressed
self.open_gripper_command = data[6] in [8, 10, 12]
# Check if LT is pressed
self.close_gripper_command = data[6] in [4, 6, 12]
# Check if Y/Triangle button (bit 7) is pressed for saving
# Check if X/Square button (bit 5) is pressed for failure
# Check if A/Cross button (bit 4) is pressed for rerecording
if buttons & 1 << 7:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 1 << 5:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 1 << 4:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
except OSError as e: except OSError as e:
logging.error(f"Error reading from gamepad: {e}") logging.error(f"Error reading from gamepad: {e}")
def _apply_deadzone(self):
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
def _parse_8bitdo(self, data):
"""Parse HID report from 8BitDo Ultimate 2C Wireless (Bluetooth on macOS).
11-byte report layout:
byte[0]: Report ID (0x01)
byte[1]: D-pad hat switch (0=N, 2=E, 5=S, 6=W, 15=neutral)
byte[2]: Left Stick X (0=left, 127=center, 255=right)
byte[3]: Left Stick Y (0=up, 127=center, 255=down)
byte[4]: Right Stick X (inverted: 255=left, 0=right)
byte[5]: Right Stick Y (0=up, 127=center, 255=down)
byte[6]: RT analog trigger (0-255)
byte[7]: LT analog trigger (0-255)
byte[8]: Buttons -- bit0=A, bit1=B, bit3=X, bit4=Y, bit6=LB, bit7=RB
byte[9]: System -- bit0=LT(digital), bit1=RT(digital), bit3=Select,
bit4=Start, bit5=L3, bit6=R3
byte[10]: Unused
"""
if len(data) < 11:
return
self.left_x = (data[2] - 127) / 128.0
self.left_y = (data[3] - 127) / 128.0
self.right_x = -(data[4] - 127) / 128.0
self.right_y = (data[5] - 127) / 128.0
self._apply_deadzone()
buttons = data[8]
# RB (bit 7) = intervention
self.intervention_flag = bool(buttons & 0x80)
# Stick clicks for gripper: R3 (byte[9] bit6) = open, L3 (byte[9] bit5) = close
system = data[9]
self.open_gripper_command = bool(system & 0x40) # R3
self.close_gripper_command = bool(system & 0x20) # L3
# Y (bit 4) = success, X (bit 3) = failure, A (bit 0) = rerecord
if buttons & 0x10:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 0x08:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 0x01:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
def _parse_logitech(self, data):
"""Parse HID report from Logitech RumblePad 2 (and similar Logitech gamepads).
Report layout (8+ bytes):
byte[1]: Left Stick X (0-255, center=128)
byte[2]: Left Stick Y (0-255, center=128)
byte[3]: Right Stick X (0-255, center=128)
byte[4]: Right Stick Y (0-255, center=128)
byte[5]: Face buttons bitmask
byte[6]: Shoulder/trigger buttons bitmask
"""
if len(data) < 8:
return
self.left_x = (data[1] - 128) / 128.0
self.left_y = (data[2] - 128) / 128.0
self.right_x = (data[3] - 128) / 128.0
self.right_y = (data[4] - 128) / 128.0
self._apply_deadzone()
buttons = data[5]
self.intervention_flag = data[6] in [2, 6, 10, 14]
self.open_gripper_command = data[6] in [8, 10, 12]
self.close_gripper_command = data[6] in [4, 6, 12]
if buttons & 1 << 7:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 1 << 5:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 1 << 4:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
def get_deltas(self): def get_deltas(self):
"""Get the current movement deltas from gamepad state.""" delta_x = -self.left_y * self.x_step_size
# Calculate deltas - invert as needed based on controller orientation delta_y = -self.left_x * self.y_step_size
delta_x = -self.left_x * self.x_step_size # Forward/backward delta_z = -self.right_y * self.z_step_size
delta_y = -self.left_y * self.y_step_size # Left/right
delta_z = -self.right_y * self.z_step_size # Up/down
return delta_x, delta_y, delta_z return delta_x, delta_y, delta_z