diff --git a/docs/source/hilserl.mdx b/docs/source/hilserl.mdx index c70745d50..42add2677 100644 --- a/docs/source/hilserl.mdx +++ b/docs/source/hilserl.mdx @@ -320,6 +320,8 @@ python -m lerobot.scripts.find_joint_limits \ --teleop.id=blue ``` +Note: You can also use `so101_leader` as the teleop type if you have the SO101 leader arm with reduced gears for smoother teleoperation. + **Workflow** 1. Run the script and move the robot through the space that solves the task @@ -476,7 +478,7 @@ To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and defi The SO101 leader arm has reduced gears that allows it to move and track the follower arm during exploration. Therefore, taking over is much smoother than the gearless SO100. -To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file. +To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file with `leader_follower_mode` enabled: ```json { @@ -484,7 +486,8 @@ To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and "teleop": { "type": "so101_leader", "port": "/dev/tty.usbmodem585A0077921", - "use_degrees": true + "use_degrees": true, + "leader_follower_mode": true }, "processor": { "control_mode": "leader", @@ -496,6 +499,10 @@ To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and } ``` +The `leader_follower_mode` enables the leader arm to automatically track the follower's position when you're not intervening. This creates a seamless teleoperation experience where: +- When not intervening: the leader arm follows the follower arm's position +- When intervening (press `space`): you control the leader arm, and the follower tracks it in end-effector space + In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure. During the online training, press `space` to take over the policy and `space` again to give the control back to the policy. diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 813756490..e29ed2815 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -28,6 +28,7 @@ from .hil_processor import ( TimeLimitProcessor, ) from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor +from .leader_follower_processor import LeaderFollowerProcessor from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats from .observation_processor import VanillaObservationProcessor from .pipeline import ( @@ -62,6 +63,7 @@ __all__ = [ "InfoProcessor", "InterventionActionProcessor", "JointVelocityProcessor", + "LeaderFollowerProcessor", "MapDeltaActionToRobotAction", "MotorCurrentProcessor", "NormalizerProcessor", diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py new file mode 100644 index 000000000..489994db7 --- /dev/null +++ b/src/lerobot/processor/leader_follower_processor.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import torch + +from dataclasses import dataclass + +from lerobot.model.kinematics import RobotKinematics +from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey +from lerobot.teleoperators.utils import TeleopEvents +from lerobot.teleoperators import Teleoperator +from lerobot.robots import Robot + + +@ProcessorStepRegistry.register("leader_follower_processor") +@dataclass +class LeaderFollowerProcessor: + """ + Processor for leader-follower teleoperation mode. + + This processor: + 1. Sends follower positions to leader arm when not intervening + 2. Computes EE delta actions from leader when intervening + 3. Handles teleop events from the leader device + """ + + leader_device: Teleoperator + motor_names: list[str] + robot: Robot + kinematics: RobotKinematics + end_effector_step_sizes: np.ndarray | None = None + use_gripper: bool = True + prev_leader_gripper: float | None = None + max_gripper_pos: float = 100.0 + + def __call__(self, transition: EnvTransition) -> EnvTransition: + """Process transition with leader-follower logic.""" + # Get current follower position from complementary data + raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions") + if raw_joint_pos is not None: + # Send follower position to leader (for follow mode) + follower_action = { + f"{motor}.pos": float(raw_joint_pos[motor]) + for motor in self.motor_names + } + self.leader_device.send_action(follower_action) + + # Only compute EE action if intervention is active + # (AddTeleopEventsAsInfo already added IS_INTERVENTION to info) + info = transition.get(TransitionKey.INFO, {}) + if info.get(TeleopEvents.IS_INTERVENTION, False): + # Get leader joint positions from teleop_action + # (AddTeleopActionAsComplimentaryData already got the action) + complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}) + teleop_action = complementary.get("teleop_action", {}) + + if isinstance(teleop_action, dict) and raw_joint_pos is not None: + # Extract leader positions from teleop action dict + leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names]) + follower_pos = np.array([raw_joint_pos[motor] for motor in self.motor_names]) + + # Compute EE positions + leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3] + follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3] + + # Compute normalized EE delta + if self.end_effector_step_sizes is not None: + ee_delta = np.clip( + leader_ee - follower_ee, + -self.end_effector_step_sizes, + self.end_effector_step_sizes + ) + ee_delta_normalized = ee_delta / self.end_effector_step_sizes + else: + ee_delta_normalized = leader_ee - follower_ee + + # Handle gripper + if self.use_gripper and len(leader_pos) > 3: + if self.prev_leader_gripper is None: + self.prev_leader_gripper = np.clip( + leader_pos[-1], 0, self.max_gripper_pos + ) + + leader_gripper = leader_pos[-1] + gripper_delta = leader_gripper - self.prev_leader_gripper + normalized_delta = gripper_delta / self.max_gripper_pos + + # Quantize gripper action + if normalized_delta >= 0.3: + gripper_action = 2 + elif normalized_delta <= -0.1: + gripper_action = 0 + else: + gripper_action = 1 + + self.prev_leader_gripper = leader_gripper + + # Create intervention action + intervention_action = np.append(ee_delta_normalized, gripper_action) + else: + intervention_action = ee_delta_normalized + + # Override teleop_action with computed EE action + complementary["teleop_action"] = torch.from_numpy(intervention_action).float() + transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc] + + return transition + + def reset(self) -> None: + """Reset leader-follower state.""" + self.prev_leader_gripper = None + if hasattr(self.leader_device, "reset"): + self.leader_device.reset() \ No newline at end of file diff --git a/src/lerobot/scripts/rl/gym_manipulator.py b/src/lerobot/scripts/rl/gym_manipulator.py index a5f2c304c..4691c3599 100644 --- a/src/lerobot/scripts/rl/gym_manipulator.py +++ b/src/lerobot/scripts/rl/gym_manipulator.py @@ -36,6 +36,7 @@ from lerobot.processor import ( ImageCropResizeProcessor, InterventionActionProcessor, JointVelocityProcessor, + LeaderFollowerProcessor, MapDeltaActionToRobotAction, MapTensorToDeltaActionDict, MotorCurrentProcessor, @@ -52,6 +53,7 @@ from lerobot.robots import ( # noqa: F401 RobotConfig, make_robot_from_config, so100_follower, + so101_follower, ) from lerobot.robots.robot import Robot from lerobot.robots.so100_follower.robot_kinematic_processor import ( @@ -68,6 +70,7 @@ from lerobot.teleoperators import ( make_teleoperator_from_config, so101_leader, # noqa: F401 ) +from lerobot.teleoperators.so101_leader.so101_leader_follower import SO101LeaderFollower from lerobot.teleoperators.teleoperator import Teleoperator from lerobot.teleoperators.utils import TeleopEvents from lerobot.utils.robot_utils import busy_wait @@ -113,7 +116,7 @@ def create_transition( } -def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None: +def reset_follower_position(robot_arm: Robot | SO101LeaderFollower, target_position: np.ndarray) -> None: """Reset robot arm to target position using smooth trajectory.""" current_position_dict = robot_arm.bus.sync_read("Present_Position") current_position = np.array( @@ -458,15 +461,36 @@ def make_processors( env_pipeline_steps.append(ToBatchProcessor()) env_pipeline_steps.append(DeviceProcessor(device=device)) + # Get control mode + control_mode = cfg.processor.control_mode if cfg.processor is not None else "gamepad" + action_pipeline_steps = [ AddTeleopActionAsComplimentaryData(teleop_device=teleop_device), AddTeleopEventsAsInfo(teleop_device=teleop_device), - AddRobotObservationAsComplimentaryData(robot=env.robot), + AddRobotObservationAsComplimentaryData(robot=env.robot) + ] + # Check for leader control mode + if control_mode == "leader": + assert isinstance(teleop_device, SO101LeaderFollower), "Leader control mode requires SO101LeaderFollower teleop device" + + action_pipeline_steps.append( + LeaderFollowerProcessor( + leader_device=teleop_device, + motor_names=motor_names, + robot=env.robot, + kinematics=kinematics_solver, + end_effector_step_sizes=np.array(list(cfg.processor.inverse_kinematics.end_effector_step_sizes.values())), + use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, + max_gripper_pos=cfg.processor.max_gripper_pos if cfg.processor.max_gripper_pos is not None else 100.0, + ) + ) + # Standard teleop mode (gamepad, keyboard, etc.) + action_pipeline_steps.append( InterventionActionProcessor( use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, terminate_on_success=terminate_on_success, ), - ] + ) # Replace InverseKinematicsProcessor with new kinematic processors if cfg.processor.inverse_kinematics is not None and kinematics_solver is not None: @@ -572,11 +596,7 @@ def control_loop( dt = 1.0 / cfg.env.fps print(f"Starting control loop at {cfg.env.fps} FPS") - print("Controls:") - print("- Use gamepad/teleop device for intervention") - print("- When not intervening, robot will stay still") - print("- Press Ctrl+C to exit") - + # Reset environment and processors obs, info = env.reset() complementary_data = ( diff --git a/src/lerobot/teleoperators/so101_leader/__init__.py b/src/lerobot/teleoperators/so101_leader/__init__.py index 11e277c91..32450a766 100644 --- a/src/lerobot/teleoperators/so101_leader/__init__.py +++ b/src/lerobot/teleoperators/so101_leader/__init__.py @@ -16,3 +16,4 @@ from .config_so101_leader import SO101LeaderConfig from .so101_leader import SO101Leader +from .so101_leader_follower import SO101LeaderFollower diff --git a/src/lerobot/teleoperators/so101_leader/config_so101_leader.py b/src/lerobot/teleoperators/so101_leader/config_so101_leader.py index 8d91c32df..1b86e9033 100644 --- a/src/lerobot/teleoperators/so101_leader/config_so101_leader.py +++ b/src/lerobot/teleoperators/so101_leader/config_so101_leader.py @@ -26,3 +26,6 @@ class SO101LeaderConfig(TeleoperatorConfig): port: str use_degrees: bool = False + + # Enable leader-follower mode where leader can both lead and follow + leader_follower_mode: bool = False diff --git a/src/lerobot/teleoperators/so101_leader/so101_leader_follower.py b/src/lerobot/teleoperators/so101_leader/so101_leader_follower.py new file mode 100644 index 000000000..2356b59c6 --- /dev/null +++ b/src/lerobot/teleoperators/so101_leader/so101_leader_follower.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from collections import deque +from threading import Event, Thread + +import numpy as np +from pynput import keyboard + +from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader +from lerobot.teleoperators.utils import TeleopEvents + +logger = logging.getLogger(__name__) + + +class SO101LeaderFollower(SO101Leader): + """ + Extended SO101 Leader that can both lead (human control) and follow (mimic follower). + + This class adds leader-follower functionality where: + - In follow mode: The leader arm mimics the follower's position (torque enabled) + - In lead mode: Human controls the leader (torque disabled) and provides actions + """ + + def __init__(self, config): + super().__init__(config) + + # Leader-follower state + self.is_intervening = False + self.leader_torque_enabled = True + + # Tracking error for automatic intervention detection + self.leader_tracking_error_queue = deque(maxlen=4) + + # Keyboard event handling + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False, + } + self.keyboard_thread = None + self.stop_event = Event() + + # Store last follower position for action computation + self.last_follower_pos = None + + def connect(self, calibrate: bool = True) -> None: + """Connect and configure for leader-follower mode.""" + super().connect(calibrate) + + # Configure for leader-follower mode with lower gains + # Lower gains allow manual intervention without injury risk + self.bus.sync_write("Torque_Enable", 1) + for motor in self.bus.motors: + self.bus.write("P_Coefficient", motor, 16) + self.bus.write("I_Coefficient", motor, 0) + self.bus.write("D_Coefficient", motor, 16) + + # Start keyboard listener + self._start_keyboard_listener() + + print("- Leader-Follower Mode:") + print(" - Press SPACE to toggle intervention (leader control)") + print(" - When not intervening, leader follows follower position") + print(" - When intervening, follower follows leader in end-effector space") + print(" - Press 's' to mark episode as success") + print(" - Press ESC to end episode as failure") + print(" - Press 'r' to re-record episode") + + def _start_keyboard_listener(self): + """Start keyboard listener thread for intervention control.""" + def on_press(key): + try: + if key == keyboard.Key.space: + self.keyboard_events["intervention"] = not self.keyboard_events["intervention"] + self.is_intervening = self.keyboard_events["intervention"] + state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE" + logger.info(f"Toggled to {state}") + elif key == keyboard.Key.esc: + self.keyboard_events["failure"] = True + elif hasattr(key, 'char'): + if key.char == 's': + self.keyboard_events["success"] = True + elif key.char == 'r': + self.keyboard_events["rerecord"] = True + except Exception as e: + logger.error(f"Error handling key press: {e}") + + def listen(): + with keyboard.Listener(on_press=on_press) as listener: + while not self.stop_event.is_set(): + time.sleep(0.1) + listener.stop() + + self.keyboard_thread = Thread(target=listen, daemon=True) + self.keyboard_thread.start() + + def send_action(self, action: dict[str, float]) -> None: + """ + Send position commands to leader arm (follow mode). + + Args: + action: Dictionary of motor positions to command + """ + # Store follower position for later use + self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors]) + + if not self.is_intervening: + # Follow mode: enable torque and track follower + if not self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 1) + self.leader_torque_enabled = True + + # Send follower positions to leader + goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors} + self.bus.sync_write("Goal_Position", goal_pos) + + # Track error for automatic intervention detection + current_pos = self.bus.sync_read("Present_Position") + current_array = np.array([current_pos[motor] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + + def get_action(self) -> dict[str, float]: + """ + Get action from leader arm. + + In follow mode: Returns neutral/current positions + In lead mode: Returns actual leader positions for follower to track + """ + start = time.perf_counter() + + if self.is_intervening: + # Lead mode: disable torque if needed and return leader positions + if self.leader_torque_enabled: + self.bus.sync_write("Torque_Enable", 0) + self.leader_torque_enabled = False + + # Get current leader position + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + # Track error + if self.last_follower_pos is not None: + current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors]) + error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1]) + self.leader_tracking_error_queue.append(error) + else: + # Follow mode: return current/neutral positions + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def get_teleop_events(self) -> dict[TeleopEvents, bool]: + """Get current keyboard events.""" + events = {} + + # Map keyboard events to TeleopEvents + if self.keyboard_events["success"]: + events[TeleopEvents.SUCCESS] = True + self.keyboard_events["success"] = False + if self.keyboard_events["failure"]: + events[TeleopEvents.FAILURE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["failure"] = False + if self.keyboard_events["rerecord"]: + events[TeleopEvents.RERECORD_EPISODE] = True + events[TeleopEvents.TERMINATE_EPISODE] = True + self.keyboard_events["rerecord"] = False + + # Always report intervention state + events[TeleopEvents.IS_INTERVENTION] = self.is_intervening + + return events + + def disconnect(self) -> None: + """Disconnect and cleanup.""" + self.stop_event.set() + if self.keyboard_thread: + self.keyboard_thread.join(timeout=1.0) + super().disconnect() + + def reset(self) -> None: + """Reset leader-follower state.""" + self.is_intervening = False + self.leader_torque_enabled = True + self.leader_tracking_error_queue.clear() + self.keyboard_events = { + "intervention": False, + "success": False, + "failure": False, + "rerecord": False + } \ No newline at end of file diff --git a/src/lerobot/teleoperators/utils.py b/src/lerobot/teleoperators/utils.py index bdfd41f02..09452bdc7 100644 --- a/src/lerobot/teleoperators/utils.py +++ b/src/lerobot/teleoperators/utils.py @@ -42,7 +42,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator: return SO100Leader(config) elif config.type == "so101_leader": - from .so101_leader import SO101Leader + from .so101_leader import SO101Leader, SO101LeaderFollower + + if getattr(config, "leader_follower_mode", False): + return SO101LeaderFollower(config) return SO101Leader(config) elif config.type == "stretch3":