Compare commits

...

7 Commits

Author SHA1 Message Date
Khalil Meftah
a0dc324b81 update close gripper button 2026-04-05 18:05:19 +02:00
Khalil Meftah
1d275e2021 change close gripper button 2026-04-05 18:00:43 +02:00
Khalil Meftah
24bb2cb0ff refactor: xbox gamepad buttons 2026-04-05 17:56:00 +02:00
Khalil Meftah
1d414c07e2 fix xbox gamepad 2026-04-01 10:59:40 +02:00
Khalil Meftah
e04e3399b9 fix normalizatiom 2026-03-25 19:26:41 +01:00
Jade Choghari
017ff73fbf chore(docs): add rename map and empty cam guide (#3065)
* add blog/guide

* add to tree

* chore(docs): rephrase rename_map docs for clarity and simplicity

---------

Co-authored-by: Steven Palma <steven.palma@huggingface.co>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-03-23 13:57:53 -07:00
Praedico
f90db58c15 docs(async): fix GitHub issues link (#3186) 2026-03-19 22:32:07 -07:00
7 changed files with 398 additions and 152 deletions

View File

@@ -19,6 +19,8 @@
title: Multi GPU training
- local: peft_training
title: Training with PEFT (e.g., LoRA)
- local: rename_map
title: Using Rename Map and Empty Cameras
title: "Tutorials"
- sections:
- local: lerobot-dataset-v3

View File

@@ -310,4 +310,4 @@ Asynchronous inference represents a significant advancement in real-time robotic
- **Universal Compatibility**: Works with all LeRobot-supported policies, from lightweight ACT models to vision-language models like SmolVLA
Start experimenting with the default parameters, monitor your action queue sizes, and iteratively refine your setup to achieve optimal performance for your specific use case.
If you want to discuss this further, hop into our [Discord community](https://discord.gg/s3KuuzsPFb), or open an issue on our [GitHub repository](https://github.com/lerobot/lerobot/issues).
If you want to discuss this further, hop into our [Discord community](https://discord.gg/s3KuuzsPFb), or open an issue on our [GitHub repository](https://github.com/huggingface/lerobot/issues).

114
docs/source/rename_map.mdx Normal file
View File

@@ -0,0 +1,114 @@
# Rename Map and Empty Cameras
When you train, evaluate, or record with a robot policy, your **dataset** or **environment** provides observations under one set of keys (e.g. `observation.images.front`, `observation.images.eagle`), while your **policy** expects another (e.g. `observation.images.image`, `observation.images.image2`). The **rename map** bridges that gap without changing the policy or data source.
> **Scope:** The rename map only renames **observation** keys (images and state). Action keys are not affected.
## Why observation keys don't always match
Policies have a fixed set of **input feature names** baked into their pretrained config. For example:
- [pi0fast-libero](https://huggingface.co/lerobot/pi0fast-libero) expects `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb`.
- [xvla-base](https://huggingface.co/lerobot/xvla-base) expects `observation.images.image`, `observation.images.image2`, and `observation.images.image3`.
Your dataset might use different names entirely (e.g. `observation.images.front`, `observation.images.eagle`, `observation.images.glove`), and your eval environment might use yet another set. Rather than editing the policy config or renaming columns in the dataset, you pass a **rename map**: a JSON dictionary that maps source keys to the keys the policy expects. Renaming happens inside the preprocessor pipeline, so the policy always sees its expected keys.
## Using the rename map
Pass the mapping as a JSON string on the command line. The convention is always:
```
--rename_map='{"source_key": "policy_key", ...}'
```
where **source_key** is what the dataset or environment provides, and **policy_key** is what the policy expects.
Only listed keys are renamed; everything else passes through unchanged. Order of entries doesn't matter.
Supported policies: **PI0**, **PI05**, **PI0Fast**, **SmolVLA**, and **XVLA**.
### Training
Suppose you fine-tune [lerobot/xvla-base](https://huggingface.co/lerobot/xvla-base) on a dataset with images under `observation.images.front`, `observation.images.eagle`, and `observation.images.glove`. XVLA expects `observation.images.image`, `observation.images.image2`, and `observation.images.image3`:
```bash
lerobot-train \
--dataset.repo_id=YOUR_DATASET \
--output_dir=./outputs/xvla_training \
--job_name=xvla_training \
--policy.path="lerobot/xvla-base" \
--policy.repo_id="HF_USER/xvla-your-robot" \
--policy.dtype=bfloat16 \
--policy.action_mode=auto \
--steps=20000 \
--policy.device=cuda \
--policy.freeze_vision_encoder=false \
--policy.freeze_language_encoder=false \
--policy.train_policy_transformer=true \
--policy.train_soft_prompts=true \
--rename_map='{"observation.images.front": "observation.images.image", "observation.images.eagle": "observation.images.image2", "observation.images.glove": "observation.images.image3"}'
```
### Evaluation
A policy that expects `observation.images.base_0_rgb` and `observation.images.left_wrist_0_rgb` (e.g. [pi0fast-libero](https://huggingface.co/lerobot/pi0fast-libero)), but the LIBERO environment returns `observation.images.image` and `observation.images.image2`:
```bash
lerobot-eval \
--policy.path=lerobot/pi0fast-libero \
--env.type=libero \
... \
--rename_map='{"observation.images.image": "observation.images.base_0_rgb", "observation.images.image2": "observation.images.left_wrist_0_rgb"}'
```
### Recording
`lerobot-record` also supports rename maps, nested under the dataset config:
```bash
lerobot-record \ # When running inference
--policy.path="<user>/smolVLA_finetuned" \
... \
--dataset.rename_map='{"observation.images.glove2": "observation.images.image"}'
```
## Alternative: edit the policy config directly
If you always use the same dataset or environment, you can **edit the policy's `config.json`** so its observation keys match your data source. Then no rename map is needed.
The tradeoff: modifying the policy config ties it to one data source. A rename map keeps one policy usable across many datasets and environments.
## Empty cameras: fewer views than the policy expects
Some policies are built for a fixed number of image inputs. If your dataset has fewer cameras, you can set **`empty_cameras`** in the policy config instead of modifying the model architecture.
### How it works
Setting `empty_cameras=N` adds N placeholder image features to the policy config, named:
```
observation.images.empty_camera_0
observation.images.empty_camera_1
...
```
At runtime, these keys have no corresponding data in the batch. The policy fills them with masked dummy tensors (padded with `-1` for SigLIP-based vision encoders, with a zero attention mask), so the extra image slots are effectively ignored during training and inference.
### Example
XVLA-base has three visual inputs and `empty_cameras=0` by default. Your dataset only has two cameras:
1. Set `--policy.empty_cameras=1`.
2. The config adds a third key: `observation.images.empty_camera_0`.
3. Use the rename map for your two real cameras as usual.
4. The third slot is masked out — no fake images needed in your dataset.
## Quick reference
| Goal | What to do |
| ----------------------------------------- | --------------------------------------------------------------------------- |
| Dataset keys ≠ policy keys | `--rename_map='{"dataset_key": "policy_key", ...}'` |
| Env keys ≠ policy keys (eval) | `--rename_map='{"env_key": "policy_key", ...}'` |
| Recording with different keys (inference) | `--dataset.rename_map='{"source_key": "policy_key", ...}'`. |
| Fewer cameras than policy expects | `--policy.empty_cameras=N` (supported by PI0, PI05, PI0Fast, SmolVLA, XVLA) |
| Avoid passing a rename map | Edit the policy's `config.json` so its keys match your data source |

View File

@@ -131,6 +131,15 @@ class _NormalizationMixin:
if self.dtype is None:
self.dtype = torch.float32
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
def _reshape_visual_stats(self) -> None:
"""Reshape visual stats from ``[C]`` to ``[C, 1, 1]`` for image broadcasting."""
for key, feature in self.features.items():
if feature.type == FeatureType.VISUAL and key in self._tensor_stats:
for stat_name, stat_tensor in self._tensor_stats[key].items():
if isinstance(stat_tensor, Tensor) and stat_tensor.ndim == 1:
self._tensor_stats[key][stat_name] = stat_tensor.reshape(-1, 1, 1)
def to(
self, device: torch.device | str | None = None, dtype: torch.dtype | None = None
@@ -149,6 +158,7 @@ class _NormalizationMixin:
if dtype is not None:
self.dtype = dtype
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype)
self._reshape_visual_stats()
return self
def state_dict(self) -> dict[str, Tensor]:
@@ -198,6 +208,7 @@ class _NormalizationMixin:
# Don't load from state_dict, keep the explicitly provided stats
# But ensure _tensor_stats is properly initialized
self._tensor_stats = to_tensor(self.stats, device=self.device, dtype=self.dtype) # type: ignore[assignment]
self._reshape_visual_stats()
return
# Normal behavior: load stats from state_dict
@@ -209,6 +220,8 @@ class _NormalizationMixin:
dtype=torch.float32, device=self.device
)
self._reshape_visual_stats()
# Reconstruct the original stats dict from tensor stats for compatibility with to() method
# and other functions that rely on self.stats
self.stats = {}

View File

@@ -62,6 +62,7 @@ from lerobot.configs import parser
from lerobot.configs.train import TrainRLServerPipelineConfig
from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
from lerobot.rl.process import ProcessSignalHandler
from lerobot.rl.queue import get_last_item_from_queue
from lerobot.robots import so_follower # noqa: F401
@@ -258,6 +259,11 @@ def act_with_policy(
policy = policy.eval()
assert isinstance(policy, nn.Module)
preprocessor, postprocessor = make_sac_pre_post_processors(
config=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
obs, info = online_env.reset()
env_processor.reset()
action_processor.reset()
@@ -289,7 +295,9 @@ def act_with_policy(
# Time policy inference and check if it meets FPS requirement
with policy_timer:
# Extract observation from transition for policy
action = policy.select_action(batch=observation)
normalized_observation = preprocessor.process_observation(observation)
action = policy.select_action(batch=normalized_observation)
# action = postprocessor.process_action(action)
policy_fps = policy_timer.fps_last
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)

View File

@@ -66,6 +66,7 @@ from lerobot.datasets.factory import make_dataset
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.policies.factory import make_policy
from lerobot.policies.sac.modeling_sac import SACPolicy
from lerobot.policies.sac.processor_sac import make_sac_pre_post_processors
from lerobot.rl.buffer import ReplayBuffer, concatenate_batch_transitions
from lerobot.rl.process import ProcessSignalHandler
from lerobot.rl.wandb_utils import WandBLogger
@@ -313,6 +314,11 @@ def add_actor_information_and_train(
assert isinstance(policy, nn.Module)
preprocessor, _ = make_sac_pre_post_processors(
config=cfg.policy,
dataset_stats=cfg.policy.dataset_stats,
)
policy.train()
push_actor_policy_to_queue(parameters_queue=parameters_queue, policy=policy)
@@ -408,6 +414,9 @@ def add_actor_information_and_train(
done = batch["done"]
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observations = preprocessor.process_observation(observations)
next_observations = preprocessor.process_observation(next_observations)
observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations
)
@@ -467,6 +476,9 @@ def add_actor_information_and_train(
check_nan_in_transition(observations=observations, actions=actions, next_state=next_observations)
observations = preprocessor.process_observation(observations)
next_observations = preprocessor.process_observation(next_observations)
observation_features, next_observation_features = get_observation_features(
policy=policy, observations=observations, next_observations=next_observations
)

View File

@@ -23,65 +23,46 @@ class InputController:
"""Base class for input controllers that generate motion deltas."""
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0):
"""
Initialize the controller.
Args:
x_step_size: Base movement step size in meters
y_step_size: Base movement step size in meters
z_step_size: Base movement step size in meters
"""
self.x_step_size = x_step_size
self.y_step_size = y_step_size
self.z_step_size = z_step_size
self.running = True
self.episode_end_status = None # None, "success", or "failure"
self.episode_end_status = None
self.intervention_flag = False
self.open_gripper_command = False
self.close_gripper_command = False
def start(self):
"""Start the controller and initialize resources."""
pass
def stop(self):
"""Stop the controller and release resources."""
pass
def reset(self):
pass
def get_deltas(self):
"""Get the current movement deltas (dx, dy, dz) in meters."""
return 0.0, 0.0, 0.0
def update(self):
"""Update controller state - call this once per frame."""
pass
def __enter__(self):
"""Support for use in 'with' statements."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Ensure resources are released when exiting 'with' block."""
self.stop()
def get_episode_end_status(self):
"""
Get the current episode end status.
Returns:
None if episode should continue, "success" or "failure" otherwise
"""
status = self.episode_end_status
self.episode_end_status = None # Reset after reading
self.episode_end_status = None
return status
def should_intervene(self):
"""Return True if intervention flag was set."""
return self.intervention_flag
def gripper_command(self):
"""Return the current gripper command."""
if self.open_gripper_command == self.close_gripper_command:
return "stay"
elif self.open_gripper_command:
@@ -102,14 +83,14 @@ class KeyboardController(InputController):
"backward_y": False,
"forward_z": False,
"backward_z": False,
"quit": False,
"success": False,
"failure": False,
"intervention": False,
"rerecord": False,
}
self.listener = None
def start(self):
"""Start the keyboard listener."""
from pynput import keyboard
def on_press(key):
@@ -126,16 +107,21 @@ class KeyboardController(InputController):
self.key_states["backward_z"] = True
elif key == keyboard.Key.shift_r:
self.key_states["forward_z"] = True
elif key == keyboard.Key.esc:
self.key_states["quit"] = True
self.running = False
return False
elif key == keyboard.Key.ctrl_r:
self.open_gripper_command = True
elif key == keyboard.Key.ctrl_l:
self.close_gripper_command = True
elif key == keyboard.Key.enter:
self.key_states["success"] = True
self.episode_end_status = TeleopEvents.SUCCESS
elif key == keyboard.Key.backspace:
elif key == keyboard.Key.esc:
self.key_states["failure"] = True
self.episode_end_status = TeleopEvents.FAILURE
elif key == keyboard.Key.space:
self.key_states["intervention"] = not self.key_states["intervention"]
elif hasattr(key, "char") and key.char == "r":
self.key_states["rerecord"] = True
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
except AttributeError:
pass
@@ -153,10 +139,10 @@ class KeyboardController(InputController):
self.key_states["backward_z"] = False
elif key == keyboard.Key.shift_r:
self.key_states["forward_z"] = False
elif key == keyboard.Key.enter:
self.key_states["success"] = False
elif key == keyboard.Key.backspace:
self.key_states["failure"] = False
elif key == keyboard.Key.ctrl_r:
self.open_gripper_command = False
elif key == keyboard.Key.ctrl_l:
self.close_gripper_command = False
except AttributeError:
pass
@@ -165,18 +151,18 @@ class KeyboardController(InputController):
print("Keyboard controls:")
print(" Arrow keys: Move in X-Y plane")
print(" Shift and Shift_R: Move in Z axis")
print(" Shift / Shift_R: Move in Z axis")
print(" Ctrl_R / Ctrl_L: Open / Close gripper")
print(" Space: Toggle intervention")
print(" Enter: End episode with SUCCESS")
print(" Backspace: End episode with FAILURE")
print(" ESC: Exit")
print(" Esc: End episode with FAILURE")
print(" R: Rerecord episode")
def stop(self):
"""Stop the keyboard listener."""
if self.listener and self.listener.is_alive():
self.listener.stop()
def get_deltas(self):
"""Get the current movement deltas from keyboard state."""
delta_x = delta_y = delta_z = 0.0
if self.key_states["forward_x"]:
@@ -194,18 +180,58 @@ class KeyboardController(InputController):
return delta_x, delta_y, delta_z
def should_intervene(self):
return self.key_states["intervention"]
def reset(self):
for key in self.key_states:
self.key_states[key] = False
class GamepadController(InputController):
"""Generate motion deltas from gamepad input."""
"""Generate motion deltas from gamepad input using pygame.
Matches gym-hil button/axis conventions for Linux gamepads, including
Xbox mappings.
"""
# Face buttons (same across most controllers on Linux)
BUTTON_A = 0
BUTTON_B = 1
BUTTON_X = 2
BUTTON_Y = 3
BUTTON_LB = 4
BUTTON_RB = 5
# Stick axes
AXIS_LEFT_X = 0
AXIS_LEFT_Y = 1
AXIS_RIGHT_X = 2
AXIS_RIGHT_Y = 3
# Default trigger buttons
BUTTON_LT = 6
BUTTON_RT = 7
# Xbox (gym-hil mapping on Linux)
XBOX_BUTTON_LT = 9
XBOX_BUTTON_RT = 10
def __init__(self, x_step_size=1.0, y_step_size=1.0, z_step_size=1.0, deadzone=0.1):
super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone
self.joystick = None
self.intervention_flag = False
self.is_xbox = False
self._xbox360_profile = False
self._invert_left_x = False
self._invert_left_y = True
self._invert_right_y = True
def _detect_xbox(self, name):
name_lower = name.lower()
return any(tag in name_lower for tag in ["xbox", "microsoft", "x-box"])
def start(self):
"""Initialize pygame and the gamepad."""
import pygame
pygame.init()
@@ -218,18 +244,35 @@ class GamepadController(InputController):
self.joystick = pygame.joystick.Joystick(0)
self.joystick.init()
logging.info(f"Initialized gamepad: {self.joystick.get_name()}")
joystick_name = self.joystick.get_name()
self.is_xbox = self._detect_xbox(joystick_name)
self._xbox360_profile = joystick_name == "Xbox 360 Controller"
if self._xbox360_profile:
# gym-hil "Xbox 360 Controller" profile
self.AXIS_RIGHT_X = 3
self.AXIS_RIGHT_Y = 4
self.BUTTON_LT = self.XBOX_BUTTON_LT
self.BUTTON_RT = self.XBOX_BUTTON_RT
self._invert_left_x = True
else:
# gym-hil default profile
self.AXIS_RIGHT_X = 2
self.AXIS_RIGHT_Y = 3
self.BUTTON_LT = 6
self.BUTTON_RT = 7
self._invert_left_x = False
logging.info(f"Initialized gamepad: {joystick_name} (xbox={self.is_xbox})")
print("Gamepad controls:")
print(" Left analog stick: Move in X-Y plane")
print(" Right analog stick (vertical): Move in Z axis")
print(" B/Circle button: Exit")
print(" Y/Triangle button: End episode with SUCCESS")
print(" A/Cross button: End episode with FAILURE")
print(" X/Square button: Rerecord episode")
print(" RB: Intervention toggle")
print(" LT / RT: Close / Open gripper")
print(" Y: End episode with SUCCESS")
print(" A: End episode with FAILURE")
print(" X: Rerecord episode")
def stop(self):
"""Clean up pygame resources."""
import pygame
if pygame.joystick.get_init():
@@ -239,67 +282,56 @@ class GamepadController(InputController):
pygame.quit()
def update(self):
"""Process pygame events to get fresh gamepad readings."""
import pygame
for event in pygame.event.get():
if event.type == pygame.JOYBUTTONDOWN:
if event.button == 3:
if event.button == self.BUTTON_Y:
self.episode_end_status = TeleopEvents.SUCCESS
# A button (1) for failure
elif event.button == 1:
elif event.button == self.BUTTON_A:
self.episode_end_status = TeleopEvents.FAILURE
# X button (0) for rerecord
elif event.button == 0:
elif event.button == self.BUTTON_X:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
# RB button (6) for closing gripper
elif event.button == 6:
elif event.button == self.BUTTON_LT:
self.close_gripper_command = True
# LT button (7) for opening gripper
elif event.button == 7:
elif event.button == self.BUTTON_RT:
self.open_gripper_command = True
# Reset episode status on button release
elif event.type == pygame.JOYBUTTONUP:
if event.button in [0, 2, 3]:
if event.button in [self.BUTTON_Y, self.BUTTON_A, self.BUTTON_X]:
self.episode_end_status = None
elif event.button == 6:
elif event.button == self.BUTTON_LT:
self.close_gripper_command = False
elif event.button == 7:
elif event.button == self.BUTTON_RT:
self.open_gripper_command = False
# Check for RB button (typically button 5) for intervention flag
if self.joystick.get_button(5):
if self.joystick.get_button(self.BUTTON_RB):
self.intervention_flag = True
else:
self.intervention_flag = False
def get_deltas(self):
"""Get the current movement deltas from gamepad state."""
import pygame
try:
# Read joystick axes
# Left stick X and Y (typically axes 0 and 1)
y_input = self.joystick.get_axis(0) # Up/Down (often inverted)
x_input = self.joystick.get_axis(1) # Left/Right
x_input = self.joystick.get_axis(self.AXIS_LEFT_X)
y_input = self.joystick.get_axis(self.AXIS_LEFT_Y)
z_input = self.joystick.get_axis(self.AXIS_RIGHT_Y)
# Right stick Y (typically axis 3 or 4)
z_input = self.joystick.get_axis(3) # Up/Down for Z
# Apply deadzone to avoid drift
x_input = 0 if abs(x_input) < self.deadzone else x_input
y_input = 0 if abs(y_input) < self.deadzone else y_input
z_input = 0 if abs(z_input) < self.deadzone else z_input
# Calculate deltas (note: may need to invert axes depending on controller)
delta_x = -x_input * self.x_step_size # Forward/backward
delta_y = -y_input * self.y_step_size # Left/right
delta_z = -z_input * self.z_step_size # Up/down
if self._invert_left_x:
x_input = -x_input
if self._invert_left_y:
y_input = -y_input
if self._invert_right_y:
z_input = -z_input
delta_x = y_input * self.y_step_size
delta_y = x_input * self.x_step_size
delta_z = z_input * self.z_step_size
return delta_x, delta_y, delta_z
@@ -309,7 +341,15 @@ class GamepadController(InputController):
class GamepadControllerHID(InputController):
"""Generate motion deltas from gamepad input using HIDAPI."""
"""Generate motion deltas from gamepad input using HIDAPI.
Supports auto-detection of controller type for correct HID report parsing.
Currently supported: Logitech RumblePad 2, 8BitDo Ultimate 2C Wireless.
"""
CONTROLLER_LOGITECH = "logitech"
CONTROLLER_8BITDO = "8bitdo"
CONTROLLER_UNKNOWN = "unknown"
def __init__(
self,
@@ -318,36 +358,26 @@ class GamepadControllerHID(InputController):
z_step_size=1.0,
deadzone=0.1,
):
"""
Initialize the HID gamepad controller.
Args:
step_size: Base movement step size in meters
z_scale: Scaling factor for Z-axis movement
deadzone: Joystick deadzone to prevent drift
"""
super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone
self.device = None
self.device_info = None
self.controller_type = self.CONTROLLER_UNKNOWN
# Movement values (normalized from -1.0 to 1.0)
self.left_x = 0.0
self.left_y = 0.0
self.right_x = 0.0
self.right_y = 0.0
# Button states
self.buttons = {}
def find_device(self):
"""Look for the gamepad device by vendor and product ID."""
import hid
devices = hid.enumerate()
for device in devices:
device_name = device["product_string"]
if any(controller in device_name for controller in ["Logitech", "Xbox", "PS4", "PS5"]):
if any(controller in device_name for controller in ["Logitech", "Xbox", "PS4", "PS5", "8BitDo"]):
return device
logging.error(
@@ -355,8 +385,15 @@ class GamepadControllerHID(InputController):
)
return None
def _detect_controller_type(self, product_string):
product = product_string.lower() if product_string else ""
if "8bitdo" in product:
return self.CONTROLLER_8BITDO
elif "logitech" in product:
return self.CONTROLLER_LOGITECH
return self.CONTROLLER_UNKNOWN
def start(self):
"""Connect to the gamepad using HIDAPI."""
import hid
self.device_info = self.find_device()
@@ -374,12 +411,22 @@ class GamepadControllerHID(InputController):
product = self.device.get_product_string()
logging.info(f"Connected to {manufacturer} {product}")
logging.info("Gamepad controls (HID mode):")
logging.info(" Left analog stick: Move in X-Y plane")
logging.info(" Right analog stick: Move in Z axis (vertical)")
logging.info(" Button 1/B/Circle: Exit")
logging.info(" Button 2/A/Cross: End episode with SUCCESS")
logging.info(" Button 3/X/Square: End episode with FAILURE")
self.controller_type = self._detect_controller_type(product)
logging.info(f"Detected controller type: {self.controller_type}")
print("Gamepad controls (HID mode):")
print(" Left analog stick: Move in X-Y plane")
print(" Right analog stick: Move in Z axis (vertical)")
print(" RB: Intervention toggle")
if self.controller_type == self.CONTROLLER_8BITDO:
print(" L3 (left stick click): Close gripper")
print(" R3 (right stick click): Open gripper")
else:
print(" LT: Close gripper")
print(" RT: Open gripper")
print(" Y: End episode with SUCCESS")
print(" X: End episode with FAILURE")
print(" A: Rerecord episode")
except OSError as e:
logging.error(f"Error opening gamepad: {e}")
@@ -387,74 +434,124 @@ class GamepadControllerHID(InputController):
self.running = False
def stop(self):
"""Close the HID device connection."""
if self.device:
self.device.close()
self.device = None
def update(self):
"""
Read and process the latest gamepad data.
Due to an issue with the HIDAPI, we need to read the read the device several times in order to get a stable reading
"""
"""Read the device several times to drain the HID buffer and get a stable reading."""
for _ in range(10):
self._update()
def _update(self):
"""Read and process the latest gamepad data."""
if not self.device or not self.running:
return
try:
# Read data from the gamepad
data = self.device.read(64)
# Interpret gamepad data - this will vary by controller model
# These offsets are for the Logitech RumblePad 2
if data and len(data) >= 8:
# Normalize joystick values from 0-255 to -1.0-1.0
self.left_y = (data[1] - 128) / 128.0
self.left_x = (data[2] - 128) / 128.0
self.right_x = (data[3] - 128) / 128.0
self.right_y = (data[4] - 128) / 128.0
if not data:
return
# Apply deadzone
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
# Parse button states (byte 5 in the Logitech RumblePad 2)
buttons = data[5]
# Check if RB is pressed then the intervention flag should be set
self.intervention_flag = data[6] in [2, 6, 10, 14]
# Check if RT is pressed
self.open_gripper_command = data[6] in [8, 10, 12]
# Check if LT is pressed
self.close_gripper_command = data[6] in [4, 6, 12]
# Check if Y/Triangle button (bit 7) is pressed for saving
# Check if X/Square button (bit 5) is pressed for failure
# Check if A/Cross button (bit 4) is pressed for rerecording
if buttons & 1 << 7:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 1 << 5:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 1 << 4:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
if self.controller_type == self.CONTROLLER_8BITDO:
self._parse_8bitdo(data)
else:
self._parse_logitech(data)
except OSError as e:
logging.error(f"Error reading from gamepad: {e}")
def _apply_deadzone(self):
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
def _parse_8bitdo(self, data):
"""Parse HID report from 8BitDo Ultimate 2C Wireless (Bluetooth on macOS).
11-byte report layout:
byte[0]: Report ID (0x01)
byte[1]: D-pad hat switch (0=N, 2=E, 5=S, 6=W, 15=neutral)
byte[2]: Left Stick X (0=left, 127=center, 255=right)
byte[3]: Left Stick Y (0=up, 127=center, 255=down)
byte[4]: Right Stick X (inverted: 255=left, 0=right)
byte[5]: Right Stick Y (0=up, 127=center, 255=down)
byte[6]: RT analog trigger (0-255)
byte[7]: LT analog trigger (0-255)
byte[8]: Buttons -- bit0=A, bit1=B, bit3=X, bit4=Y, bit6=LB, bit7=RB
byte[9]: System -- bit0=LT(digital), bit1=RT(digital), bit3=Select,
bit4=Start, bit5=L3, bit6=R3
byte[10]: Unused
"""
if len(data) < 11:
return
self.left_x = (data[2] - 127) / 128.0
self.left_y = (data[3] - 127) / 128.0
self.right_x = -(data[4] - 127) / 128.0
self.right_y = (data[5] - 127) / 128.0
self._apply_deadzone()
buttons = data[8]
# RB (bit 7) = intervention
self.intervention_flag = bool(buttons & 0x80)
# Stick clicks for gripper: R3 (byte[9] bit6) = open, L3 (byte[9] bit5) = close
system = data[9]
self.open_gripper_command = bool(system & 0x40) # R3
self.close_gripper_command = bool(system & 0x20) # L3
# Y (bit 4) = success, X (bit 3) = failure, A (bit 0) = rerecord
if buttons & 0x10:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 0x08:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 0x01:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
def _parse_logitech(self, data):
"""Parse HID report from Logitech RumblePad 2 (and similar Logitech gamepads).
Report layout (8+ bytes):
byte[1]: Left Stick X (0-255, center=128)
byte[2]: Left Stick Y (0-255, center=128)
byte[3]: Right Stick X (0-255, center=128)
byte[4]: Right Stick Y (0-255, center=128)
byte[5]: Face buttons bitmask
byte[6]: Shoulder/trigger buttons bitmask
"""
if len(data) < 8:
return
self.left_x = (data[1] - 128) / 128.0
self.left_y = (data[2] - 128) / 128.0
self.right_x = (data[3] - 128) / 128.0
self.right_y = (data[4] - 128) / 128.0
self._apply_deadzone()
buttons = data[5]
self.intervention_flag = data[6] in [2, 6, 10, 14]
self.open_gripper_command = data[6] in [8, 10, 12]
self.close_gripper_command = data[6] in [4, 6, 12]
if buttons & 1 << 7:
self.episode_end_status = TeleopEvents.SUCCESS
elif buttons & 1 << 5:
self.episode_end_status = TeleopEvents.FAILURE
elif buttons & 1 << 4:
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
else:
self.episode_end_status = None
def get_deltas(self):
"""Get the current movement deltas from gamepad state."""
# Calculate deltas - invert as needed based on controller orientation
delta_x = -self.left_x * self.x_step_size # Forward/backward
delta_y = -self.left_y * self.y_step_size # Left/right
delta_z = -self.right_y * self.z_step_size # Up/down
delta_x = -self.left_y * self.x_step_size
delta_y = -self.left_x * self.y_step_size
delta_z = -self.right_y * self.z_step_size
return delta_x, delta_y, delta_z