diff --git a/examples/so100_to_so100_EE_deltas/teleoperate.py b/examples/so100_to_so100_EE_deltas/teleoperate.py index c3834307f..4432ae636 100644 --- a/examples/so100_to_so100_EE_deltas/teleoperate.py +++ b/examples/so100_to_so100_EE_deltas/teleoperate.py @@ -1,6 +1,6 @@ -# !/usr/bin/env python +#!/usr/bin/env python -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,391 +14,178 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""SO100 leader / follower teleop with HIL-SERL-style intervention toggle. + +Position-only standalone demo of the leader-arm intervention pattern used by +PR #2596's HIL-SERL training stack (see +``lerobot.processor.LeaderFollowerProcessor`` and +``lerobot.teleoperators.so_leader.SO101LeaderFollower``). Compared to the +verbatim PR #2596 example (which builds the full ``EEReferenceAndDelta`` -> +``EEBoundsAndSafety`` -> ``GripperVelocityToJoint`` -> ``InverseKinematicsRLStep`` +pipeline), this version computes the EE delta and the IK target inline against +the follower's *measured* pose every tick. That removes the latched-reference +feedback loop and produces noticeably smoother haptic teleop. + +Behaviour: + * **Following mode** (default): the follower is idle, the leader is + torque-enabled with low PID gains and haptically tracks the follower. + The user can grab the leader at any time without fighting the position + loop. + * **Intervention mode** (toggled by pressing SPACE): the leader's torque + is released, the user moves the leader freely, and the follower mirrors + the leader's end-effector position via ``[delta_x, delta_y, delta_z]`` + deltas, plus a direct gripper passthrough. This matches the action + space recorded by ``LeaderFollowerProcessor`` during HIL-SERL recording. + +Keyboard: + * ``SPACE`` -- toggle intervention on/off. + * ``ESC`` -- terminate (treated as failure event). + * ``s`` -- mark current intervention as success. + * ``r`` -- request re-record of current episode. +""" + +from __future__ import annotations + import time -from dataclasses import dataclass import numpy as np -import torch -from lerobot.configs.types import PipelineFeatureType, PolicyFeature from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import ( - ProcessorStepRegistry, - RobotAction, - RobotActionProcessorStep, - RobotObservation, - RobotProcessorPipeline, - TransitionKey, -) -from lerobot.processor.converters import ( - create_transition, - identity_transition, -) -from lerobot.robots.robot import Robot -from lerobot.robots.so_follower.config_so_follower import SO101FollowerConfig -from lerobot.robots.so_follower.robot_kinematic_processor import ( - EEBoundsAndSafety, - EEReferenceAndDelta, - GripperVelocityToJoint, - InverseKinematicsRLStep, -) -from lerobot.robots.so_follower.so_follower import SO101Follower -from lerobot.teleoperators.so_leader.config_so_leader import SO101LeaderConfig -from lerobot.teleoperators.so_leader.so101_leader_follower import SO101LeaderFollower +from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig +from lerobot.teleoperators.so_leader import SO101LeaderConfig, SO101LeaderFollower +from lerobot.teleoperators.utils import TeleopEvents from lerobot.utils.robot_utils import precise_sleep -from lerobot.utils.rotation import Rotation +FPS = 30 -def reset_follower_position(robot_arm: Robot, target_position: np.ndarray) -> None: - """Reset robot arm to target position using smooth trajectory.""" - current_position_dict = robot_arm.bus.sync_read("Present_Position") - current_position = np.array( - [current_position_dict[name] for name in current_position_dict], - dtype=np.float32, - ) - trajectory = torch.from_numpy( - np.linspace(current_position, target_position, 50) - ) # NOTE: 30 is just an arbitrary number - for pose in trajectory: - action_dict = dict(zip(current_position_dict, pose, strict=False)) - robot_arm.bus.sync_write("Goal_Position", action_dict) - precise_sleep(0.015) +# Per-axis EE-delta normalisation (metres). The clamped delta is +# ``clip((p_leader - p_follower) / step, -1, 1) * step``, so a single tick is +# bounded by ``step`` in metres. Keep small for safe motion. +EE_STEP_SIZES = {"x": 0.010, "y": 0.010, "z": 0.010} - -@dataclass -class LogRobotAction(RobotActionProcessorStep): - def action(self, action: RobotAction) -> RobotAction: - print(f"Robot action: {action}") - return action - - def transform_features(self, features): - # features[PipelineFeatureType.ACTION][ACTION] = PolicyFeature( - # type=FeatureType.ACTION, shape=(len(self.motor_names),) - # ) - return features - - -@ProcessorStepRegistry.register("forward_kinematics_joints_to_ee_target_action") -@dataclass -class ForwardKinematicsJointsToEETargetAction(RobotActionProcessorStep): - """ - Computes the end-effector pose from joint positions using forward kinematics (FK). - - This step is typically used to add the robot's Cartesian pose to the observation space, - which can be useful for visualization or as an input to a policy. - - Attributes: - kinematics: The robot's kinematic model. - """ - - kinematics: RobotKinematics - motor_names: list[str] - end_effector_step_sizes: dict - max_gripper_pos: float - use_ik_solution: bool = False - - def action(self, action: RobotAction) -> RobotAction: - # return compute_forward_kinematics_joints_to_ee(action, self.kinematics, self.motor_names) - teleop_action = action - raw_joint_pos = self.transition.get(TransitionKey.OBSERVATION) - - leader_pos = np.array([teleop_action[f"{motor}.pos"] for motor in self.motor_names]) - - leader_ee = self.kinematics.forward_kinematics(leader_pos) - - if self.use_ik_solution and "IK_solution" in self.transition.get(TransitionKey.COMPLEMENTARY_DATA): - follower_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA)["IK_solution"] - else: - follower_pos = np.array([raw_joint_pos[f"{motor}.pos"] for motor in self.motor_names]) - - follower_ee = self.kinematics.forward_kinematics(follower_pos) - - follower_ee_pos = follower_ee[:3, 3] - follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() - # follower_gripper_pos = raw_joint_pos["gripper.pos"] - follower_gripper_pos = follower_pos[-1] # assuming gripper is the last motor - - leader_ee_pos = leader_ee[:3, 3] - leader_ee_rvec = Rotation.from_matrix(leader_ee[:3, :3]).as_rotvec() - leader_gripper_pos = np.clip( - teleop_action["gripper.pos"], -self.max_gripper_pos, self.max_gripper_pos - ) - - print("f pos:", follower_ee_pos) - print("l pos:", leader_ee_pos) - - print("f rvec:", follower_ee_rvec) - print("l rvec:", leader_ee_rvec) - - # follower_ee_pos = follower_ee[:3, 3] - # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() - - delta_pos = leader_ee_pos - follower_ee_pos - - # For rotation: compute relative rotation from follower to leader - # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader - r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] - delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() - delta_gripper = leader_gripper_pos - follower_gripper_pos - - desired = np.eye(4, dtype=float) - desired[:3, :3] = follower_ee[:3, :3] @ r_delta - desired[:3, 3] = follower_ee[:3, 3] + delta_pos - - pos = desired[:3, 3] - tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() - - assert np.allclose(pos, leader_ee_pos), "Position delta computation error" - assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" - assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( - "Gripper delta computation error" - ) - - # Normalize the action to the range [-1, 1] - delta_pos = delta_pos / np.array( - [ - self.end_effector_step_sizes["x"], - self.end_effector_step_sizes["y"], - self.end_effector_step_sizes["z"], - ] - ) - delta_rvec = delta_rvec / np.array( - [ - self.end_effector_step_sizes["wx"], - self.end_effector_step_sizes["wy"], - self.end_effector_step_sizes["wz"], - ] - ) - - # Deadband: zero out tiny deltas so encoder noise on the leader does not - # twitch the follower when the operator is holding the leader still. - # 5 % of step size = ~0.2 mm / ~0.25 deg with the current step sizes. - if float(np.linalg.norm(delta_pos)) < 0.05 and float(np.linalg.norm(delta_rvec)) < 0.05: - delta_pos = np.zeros_like(delta_pos) - delta_rvec = np.zeros_like(delta_rvec) - - # Check if any of the normalized deltas exceed 1.0 - - max_normalized_pos = max( - abs(delta_pos[0]), - abs(delta_pos[1]), - abs(delta_pos[2]), - ) - - max_normalized_rot = max( - abs(delta_rvec[0]), - abs(delta_rvec[1]), - abs(delta_rvec[2]), - ) - - # Use the same scaling factor for both position and rotation - max_normalized = max(max_normalized_pos, max_normalized_rot) - if max_normalized > 1.0: - print(f"Warning: EE delta too large, scaling. Max normalized delta: {max_normalized_pos}") - print(f"Original delta_pos: {delta_pos}, delta_rvec: {delta_rvec}") - # Scale proportionally - delta_pos = delta_pos / max_normalized - delta_rvec = delta_rvec / max_normalized - - new_action = {} - new_action["enabled"] = True - new_action["target_x"] = float(delta_pos[0]) - new_action["target_y"] = float(delta_pos[1]) - new_action["target_z"] = float(delta_pos[2]) - new_action["target_wx"] = float(delta_rvec[0]) - new_action["target_wy"] = float(delta_rvec[1]) - new_action["target_wz"] = float(delta_rvec[2]) - new_action["gripper_vel"] = float( - np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) / self.max_gripper_pos - ) - return new_action - - def transform_features( - self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]] - ) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]: - # TODO: implement feature transformation - return features - - -FPS = 20 - -# Initialize the robot and teleoperator config -follower_config = SO101FollowerConfig(port="/dev/usb_follower_arm_a", id="follower_arm_a", use_degrees=True) -leader_config = SO101LeaderConfig( - port="/dev/usb_leader_arm_a", - id="leader_arm_a", - use_degrees=True, - leader_follower_mode=True, - use_gripper=True, -) - -# Initialize the robot and teleoperator -follower = SO101Follower(follower_config) -leader = SO101LeaderFollower(leader_config) - -# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf -follower_kinematics_solver = RobotKinematics( - urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(follower.bus.motors.keys()), -) - -# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf -leader_kinematics_solver = RobotKinematics( - urdf_path="../SO-ARM100/Simulation/SO101/so101_new_calib.urdf", - target_frame_name="gripper_frame_link", - joint_names=list(leader.bus.motors.keys()), -) - -end_effector_step_sizes = { - "x": 0.004, - "y": 0.004, - "z": 0.004, - "wx": 5 * np.pi / 180, - "wy": 5 * np.pi / 180, - "wz": 5 * np.pi / 180, +# Workspace bounds (metres) - tight box around the rest pose to keep the +# follower from running into joint limits during the demo. Adjust to your +# workspace. +EE_BOUNDS = { + "min": np.array([-0.20, -0.30, 0.02]), + "max": np.array([0.30, 0.30, 0.40]), } +# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: +# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf +URDF_PATH = "./SO101/so101_new_calib.urdf" +TARGET_FRAME = "gripper_frame_link" -# Build pipeline to convert teleop joints to EE action -leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction]( - steps=[ - LogRobotAction(), - ForwardKinematicsJointsToEETargetAction( - kinematics=leader_kinematics_solver, - motor_names=list(leader.bus.motors.keys()), - end_effector_step_sizes=end_effector_step_sizes, - max_gripper_pos=30.0, - use_ik_solution=True, - ), - LogRobotAction(), - ], - to_transition=identity_transition, - to_output=identity_transition, -) +# Set these to the actual ports on your machine. +FOLLOWER_PORT = "/dev/usb_follower_arm_a" +LEADER_PORT = "/dev/usb_leader_arm_a" -# build pipeline to convert EE action to robot joints -ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]( - [ - LogRobotAction(), - EEReferenceAndDelta( - kinematics=follower_kinematics_solver, - # end_effector_step_sizes={"x": 0.006, "y": 0.01, "z": 0.005}, - end_effector_step_sizes=end_effector_step_sizes, - motor_names=list(follower.bus.motors.keys()), - # Latch the reference so the follower stops drifting when the leader - # is held still. With ``False`` the reference is recomputed every - # step from the previous IK solution, which combined with motor lag - # and encoder noise creates a positive feedback loop (follower keeps - # moving even when leader is stationary). - use_latched_reference=True, - use_ik_solution=True, - ), - LogRobotAction(), - EEBoundsAndSafety( - end_effector_bounds={ - "min": [-0.05, -0.55, -0.0075], - "max": [0.55, 0.55, 0.55], - }, - # end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}, - max_ee_step_m=0.05, - ), - LogRobotAction(), - GripperVelocityToJoint( - clip_max=30.0, - speed_factor=0.2, - discrete_gripper=False, - scale_velocity=True, - use_ik_solution=True, - ), - LogRobotAction(), - InverseKinematicsRLStep( - kinematics=follower_kinematics_solver, - motor_names=list(follower.bus.motors.keys()), - # Seed IK from the follower's measured joints (not the previous IK - # output) so any residual drift in the commanded pose does not feed - # back into the next IK solve. - initial_guess_current_joints=True, - ), - LogRobotAction(), - ], - to_transition=identity_transition, - to_output=identity_transition, -) -# Connect to the robot and teleoperator -follower.connect() -leader.connect() +def _joints_dict_to_array(joints: dict[str, float], motor_names: list[str]) -> np.ndarray: + return np.array([joints[f"{m}.pos"] for m in motor_names], dtype=float) -reset_pose = [0.0, 10, 20, 60.00, 90.00, 10.00] -start_time = time.perf_counter() -reset_follower_position(follower, np.array(reset_pose)) -reset_follower_position(leader, np.array(reset_pose)) -precise_sleep(5.0 - (time.perf_counter() - start_time)) +def _array_to_joints_dict(arr: np.ndarray, motor_names: list[str]) -> dict[str, float]: + return {f"{m}.pos": float(v) for m, v in zip(motor_names, arr, strict=True)} -# Init rerun viewer -# init_rerun(session_name="so100_so100_EE_teleop") -transition = None -prev_intervening = False - -print("Starting teleop loop...") -print(" - Press SPACE to toggle intervention (leader controls follower)") -print(" - Press 's' for success, ESC for failure, 'r' to re-record") - -while True: - t0 = time.perf_counter() - - # Get robot observation - robot_obs = follower.get_observation() - - # Haptic follow: when not intervening, push follower joints onto the - # leader so the leader tracks the follower (torque on, low PID gains). - # When intervening, this call is a no-op inside SO101LeaderFollower. - leader.send_action(robot_obs) - - # Re-latch the EE reference each time the user starts a new intervention - # so the follower does not jump from a stale reference pose. - if leader.is_intervening and not prev_intervening: - ee_to_follower_joints.reset() - transition = None - prev_intervening = leader.is_intervening - - if not leader.is_intervening: - # Idle: follower holds its current pose; do not command anything. - precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) - continue - - # Get teleop observation - leader_joints_obs = leader.get_action() - - # teleop joints -> teleop EE action - if transition is None: - transition = create_transition(action=leader_joints_obs, observation=robot_obs) - else: - transition = create_transition( - action=leader_joints_obs, - observation=robot_obs, - complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), - ) - - transition = leader_to_ee(transition) - leader_ee_act = transition[TransitionKey.ACTION] - - # teleop EE -> robot joints - transition = create_transition( - action=leader_ee_act, - observation=robot_obs, - complementary_data=transition.get(TransitionKey.COMPLEMENTARY_DATA), +def main() -> None: + follower_config = SO100FollowerConfig(port=FOLLOWER_PORT, id="my_follower_arm", use_degrees=True) + leader_config = SO101LeaderConfig( + port=LEADER_PORT, + id="my_leader_arm", + use_degrees=True, + leader_follower_mode=True, + use_gripper=True, ) - transition = ee_to_follower_joints(transition) - follower_joints_act = transition[TransitionKey.ACTION] - # Send action to robot - _ = follower.send_action(follower_joints_act) + follower = SO100Follower(follower_config) + leader = SO101LeaderFollower(leader_config) - # Visualize - # log_rerun_data(observation=leader_ee_act, action=follower_joints_act) + follower_motor_names = list(follower.bus.motors.keys()) + leader_motor_names = list(leader.bus.motors.keys()) - precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) + follower_kinematics = RobotKinematics( + urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=follower_motor_names + ) + leader_kinematics = RobotKinematics( + urdf_path=URDF_PATH, target_frame_name=TARGET_FRAME, joint_names=leader_motor_names + ) + + follower.connect() + leader.connect() + + print("Starting leader-follower intervention demo...") + print(" - Press SPACE to toggle intervention.") + print(" - Press ESC to terminate, 's' for success, 'r' to re-record.") + + try: + while True: + t0 = time.perf_counter() + + # 1. Read both arms. + follower_obs = follower.get_observation() + follower_joints_dict = {f"{m}.pos": float(follower_obs[f"{m}.pos"]) for m in follower_motor_names} + leader_joints_dict = leader.get_action() + + # 2. Haptic follow: push follower joints back to the leader. The + # leader's ``send_action`` gates motor writes on its intervention + # state internally (torque on while following, off while + # intervening), so this call is safe in both modes. + leader.send_action(follower_joints_dict) + + # 3. Pull teleop events from the leader's keyboard listener. + events = leader.get_teleop_events() + if events.get(TeleopEvents.TERMINATE_EPISODE): + print("Termination requested -- exiting.") + break + + is_intervention = events.get(TeleopEvents.IS_INTERVENTION, False) + + if is_intervention: + # 4a. INTERVENTION: take normalised position-only delta against + # the follower's *measured* pose every tick (no latched + # reference, no compounding lag), integrate onto the follower's + # current EE pose, clip to the workspace, then IK. + leader_arr = _joints_dict_to_array(leader_joints_dict, leader_motor_names) + follower_arr = _joints_dict_to_array(follower_joints_dict, follower_motor_names) + + p_leader = leader_kinematics.forward_kinematics(leader_arr)[:3, 3] + p_follower_mat = follower_kinematics.forward_kinematics(follower_arr) + p_follower = p_follower_mat[:3, 3] + + step_vec = np.array([EE_STEP_SIZES["x"], EE_STEP_SIZES["y"], EE_STEP_SIZES["z"]], dtype=float) + raw_delta = p_leader - p_follower + delta_norm = np.clip(raw_delta / step_vec, -1.0, 1.0) + delta_m = delta_norm * step_vec + + target_pose = p_follower_mat.copy() + target_pose[:3, 3] = np.clip(p_follower + delta_m, EE_BOUNDS["min"], EE_BOUNDS["max"]) + + # IK -> joint-space goal for the follower's arm chain. Position + # only (orientation_weight=0.0) keeps it stable under the + # rotation-noise that would otherwise come from leader FK. + target_joints = follower_kinematics.inverse_kinematics( + current_joint_pos=follower_arr, + desired_ee_pose=target_pose, + orientation_weight=0.0, + ) + follower_action = _array_to_joints_dict(target_joints, follower_motor_names) + # Gripper passthrough: leader gripper position drives follower + # gripper directly (no IK). + follower_action["gripper.pos"] = float(leader_joints_dict.get("gripper.pos", 50.0)) + follower.send_action(follower_action) + + # 4b. FOLLOWING: leave the follower alone -- the leader haptically + # tracks it via the ``leader.send_action`` call above. In real + # HIL-SERL training this is where the policy would step the + # follower forward. + + precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0)) + finally: + leader.disconnect() + follower.disconnect() + + +if __name__ == "__main__": + main() diff --git a/src/lerobot/envs/configs.py b/src/lerobot/envs/configs.py index 2a7c52d45..efa0d728a 100644 --- a/src/lerobot/envs/configs.py +++ b/src/lerobot/envs/configs.py @@ -299,6 +299,11 @@ class HILSerlProcessorConfig: inverse_kinematics: InverseKinematicsConfig | None = None reward_classifier: RewardClassifierConfig | None = None max_gripper_pos: float | None = 100.0 + # Only used when ``control_mode == "leader"``. ``False`` (default) emits a + # 4-D position+gripper action matching the gamepad path; ``True`` emits the + # PR #2596 7-D action with rotation deltas (requires ``wx/wy/wz`` step + # sizes in ``inverse_kinematics.end_effector_step_sizes``). + use_rotation: bool = False @EnvConfig.register_subclass(name="gym_manipulator") diff --git a/src/lerobot/processor/leader_follower_processor.py b/src/lerobot/processor/leader_follower_processor.py index df2f66235..c9afb828a 100644 --- a/src/lerobot/processor/leader_follower_processor.py +++ b/src/lerobot/processor/leader_follower_processor.py @@ -48,6 +48,11 @@ class LeaderFollowerProcessor(ProcessorStep): kinematics: RobotKinematics end_effector_step_sizes: np.ndarray | None = None use_gripper: bool = True + # When True (PR #2596 default), emit a 7-D action with rotation deltas; + # when False, emit a 4-D action ``[dx, dy, dz, gripper]`` matching the + # gamepad/keyboard EE convention. ``end_effector_step_sizes`` only needs + # ``wx/wy/wz`` keys when this is True. + use_rotation: bool = True # prev_leader_gripper: float | None = None max_gripper_pos: float = 100.0 use_ik_solution: bool = False @@ -99,28 +104,9 @@ class LeaderFollowerProcessor(ProcessorStep): # follower_ee_rvec = Rotation.from_matrix(follower_ee[:3, :3]).as_rotvec() delta_pos = leader_ee_pos - follower_ee_pos - - # For rotation: compute relative rotation from follower to leader - # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader - r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] - delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() - delta_gripper = leader_gripper_pos - follower_gripper_pos - desired = np.eye(4, dtype=float) - desired[:3, :3] = follower_ee[:3, :3] @ r_delta - desired[:3, 3] = follower_ee[:3, 3] + delta_pos - - pos = desired[:3, 3] - tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() - - assert np.allclose(pos, leader_ee_pos), "Position delta computation error" - assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" - assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( - "Gripper delta computation error" - ) - - # Normalize the action to the range [-1, 1] + # Normalize the position deltas to [-1, 1] delta_pos = delta_pos / np.array( [ self.end_effector_step_sizes["x"], @@ -128,41 +114,81 @@ class LeaderFollowerProcessor(ProcessorStep): self.end_effector_step_sizes["z"], ] ) - delta_rvec = delta_rvec / np.array( - [ - self.end_effector_step_sizes["wx"], - self.end_effector_step_sizes["wy"], - self.end_effector_step_sizes["wz"], - ] - ) max_normalized_pos = max( abs(delta_pos[0]), abs(delta_pos[1]), abs(delta_pos[2]), ) - normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2])) + if self.use_rotation: + # For rotation: compute relative rotation from follower to leader + # R_leader = R_follower * R_delta => R_delta = R_follower^T * R_leader + r_delta = follower_ee[:3, :3].T @ leader_ee[:3, :3] + delta_rvec = Rotation.from_matrix(r_delta).as_rotvec() - max_normalized = max(max_normalized_pos, normalized_rot) + desired = np.eye(4, dtype=float) + desired[:3, :3] = follower_ee[:3, :3] @ r_delta + desired[:3, 3] = follower_ee[:3, 3] + ( + delta_pos + * np.array( + [ + self.end_effector_step_sizes["x"], + self.end_effector_step_sizes["y"], + self.end_effector_step_sizes["z"], + ] + ) + ) - if max_normalized > 1.0: - # Scale proportionally - delta_pos = delta_pos / max_normalized - delta_rvec = delta_rvec / max_normalized + pos = desired[:3, 3] + tw = Rotation.from_matrix(desired[:3, :3]).as_rotvec() - intervention_action = np.array( - [ - delta_pos[0], - delta_pos[1], - delta_pos[2], - delta_rvec[0], - delta_rvec[1], - delta_rvec[2], - np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) - / self.max_gripper_pos, - ], - dtype=float, - ) + assert np.allclose(pos, leader_ee_pos), "Position delta computation error" + assert np.allclose(tw, leader_ee_rvec), "Orientation delta computation error" + assert np.isclose(follower_gripper_pos + delta_gripper, leader_gripper_pos), ( + "Gripper delta computation error" + ) + + delta_rvec = delta_rvec / np.array( + [ + self.end_effector_step_sizes["wx"], + self.end_effector_step_sizes["wy"], + self.end_effector_step_sizes["wz"], + ] + ) + normalized_rot = max(abs(delta_rvec[0]), abs(delta_rvec[1]), abs(delta_rvec[2])) + max_normalized = max(max_normalized_pos, normalized_rot) + if max_normalized > 1.0: + delta_pos = delta_pos / max_normalized + delta_rvec = delta_rvec / max_normalized + + intervention_action = np.array( + [ + delta_pos[0], + delta_pos[1], + delta_pos[2], + delta_rvec[0], + delta_rvec[1], + delta_rvec[2], + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) + / self.max_gripper_pos, + ], + dtype=float, + ) + else: + # Position-only 4-D path: ``[dx, dy, dz, gripper]`` + if max_normalized_pos > 1.0: + delta_pos = delta_pos / max_normalized_pos + + intervention_action = np.array( + [ + delta_pos[0], + delta_pos[1], + delta_pos[2], + np.clip(delta_gripper, -self.max_gripper_pos, self.max_gripper_pos) + / self.max_gripper_pos, + ], + dtype=float, + ) # # Extract leader positions from teleop action dict # # leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names]) diff --git a/src/lerobot/rl/gym_manipulator.py b/src/lerobot/rl/gym_manipulator.py index c7c4eb6e4..1c697fac0 100644 --- a/src/lerobot/rl/gym_manipulator.py +++ b/src/lerobot/rl/gym_manipulator.py @@ -493,9 +493,11 @@ def make_processors( # Leader-follower control mode: leader haptically tracks follower until the # human toggles intervention with SPACE, at which point leader joints are - # converted into 7-D EE deltas (dx, dy, dz, dwx, dwy, dwz, gripper) by - # ``LeaderFollowerProcessor`` before being consumed by - # ``InterventionActionProcessorStep``. + # converted into EE deltas by ``LeaderFollowerProcessor`` and consumed by + # ``InterventionActionProcessorStep``. With ``use_rotation=False`` the + # action is the 4-D ``[dx, dy, dz, gripper]`` matching the gamepad path; + # with ``True`` it is the PR #2596 7-D ``[dx, dy, dz, dwx, dwy, dwz, gripper]``. + leader_use_rotation = bool(getattr(cfg.processor, "use_rotation", False)) if control_mode == "leader": if not isinstance(teleop_device, SO101LeaderFollower): raise ValueError( @@ -514,6 +516,7 @@ def make_processors( kinematics=kinematics_solver, end_effector_step_sizes=cfg.processor.inverse_kinematics.end_effector_step_sizes, use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, + use_rotation=leader_use_rotation, max_gripper_pos=cfg.processor.max_gripper_pos if cfg.processor.max_gripper_pos is not None else 100.0, @@ -523,7 +526,7 @@ def make_processors( action_pipeline_steps.append( InterventionActionProcessorStep( use_gripper=cfg.processor.gripper.use_gripper if cfg.processor.gripper is not None else False, - use_rotation=(control_mode == "leader"), + use_rotation=(control_mode == "leader" and leader_use_rotation), terminate_on_success=terminate_on_success, ) )