From 7b82a5c38199e240f77e979a8e17d70e1424f595 Mon Sep 17 00:00:00 2001 From: Khalil Meftah Date: Mon, 27 Apr 2026 18:31:13 +0200 Subject: [PATCH] feat(rl): add bus-control primitives and smooth move functionality for leader intervention --- .../so_leader/so_leader_follower.py | 120 +++++++++++++++--- 1 file changed, 100 insertions(+), 20 deletions(-) diff --git a/src/lerobot/teleoperators/so_leader/so_leader_follower.py b/src/lerobot/teleoperators/so_leader/so_leader_follower.py index c3e8348dd..fa99070fc 100644 --- a/src/lerobot/teleoperators/so_leader/so_leader_follower.py +++ b/src/lerobot/teleoperators/so_leader/so_leader_follower.py @@ -36,6 +36,12 @@ intact (so :func:`get_action` returns ``{".pos": float}``) while adding: toggled on, leader torque is disabled so the user can move it freely. * Lower position-loop gains on :func:`connect` (``P=16, I=0, D=16``) so the haptic follow does not jerk the user's hand when grabbing the leader. +* Bus-control primitives (:func:`enable_torque`, :func:`disable_torque`, + :func:`write_goal_positions`) and a :func:`smooth_move_to` helper. These + satisfy the ``teleop_has_motor_control`` capability gate in + ``examples/hil/hil_utils.py``, so the BC-style HIL data collector + (``examples/hil/hil_data_collection.py``) can drive an SO leader the same way + it drives the OpenArm Mini -- pause / smooth-mirror to follower / take over. The joint-to-EE-delta conversion does **not** happen here; it is performed by :class:`LeaderArmInterventionStep` in the action processor pipeline so the @@ -48,6 +54,7 @@ import contextlib import logging import os import sys +import time from typing import Any import numpy as np @@ -170,16 +177,101 @@ class SOLeaderFollower(SOLeader): self._listener.daemon = True self._listener.start() + def enable_torque(self) -> None: + """Enable position-loop torque on every motor. + + Exposed alongside :func:`disable_torque` and :func:`write_goal_positions` + so this teleop satisfies the ``teleop_has_motor_control`` gate used by + ``examples/hil/hil_data_collection.py`` (mirrors the OpenArm Mini API). + Errors are logged and swallowed -- the loop must keep ticking even if a + single haptic update fails. + """ + if not self.is_connected: + return + try: + self.bus.sync_write("Torque_Enable", 1) + self._leader_torque_enabled = True + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not enable leader torque: {e}") + + def disable_torque(self) -> None: + """Disable position-loop torque so the user can move the leader freely.""" + if not self.is_connected: + return + try: + self.bus.sync_write("Torque_Enable", 0) + self._leader_torque_enabled = False + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not disable leader torque: {e}") + + def write_goal_positions(self, positions: dict[str, float]) -> None: + """Push goal positions to the leader bus (no torque toggling). + + Accepts dataset-style keys ``{".pos": deg}`` (matches what + :func:`get_action` produces and what :func:`smooth_move_to` and + :func:`send_action` consume) -- bare motor names are also tolerated + for parity with :class:`OpenArmMini.write_goal_positions`. + """ + if not self.is_connected: + return + goal_pos: dict[str, float] = {} + for motor in self.bus.motors: + if f"{motor}.pos" in positions: + goal_pos[motor] = float(positions[f"{motor}.pos"]) + elif motor in positions: + goal_pos[motor] = float(positions[motor]) + if not goal_pos: + return + try: + self.bus.sync_write("Goal_Position", goal_pos) + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] could not push goal position to leader: {e}") + + def smooth_move_to( + self, + target_pos: dict[str, float], + duration_s: float = 2.0, + fps: int = 50, + ) -> None: + """Linearly ramp the leader from its current pose to ``target_pos``. + + Mirrors the ``teleop_smooth_move_to`` helper from + ``examples/hil/hil_utils.py`` so the leader can be safely re-engaged + without yanking the user's hand -- typical use is right after + :func:`connect` (or whenever the leader and follower drift apart, e.g. + on episode reset). Blocks for ``duration_s`` seconds. + """ + if not self.is_connected: + return + + steps = max(int(duration_s * fps), 1) + try: + current = self.get_action() + except Exception as e: # pragma: no cover - hardware path + logger.warning(f"[SOLeaderFollower] smooth_move_to could not read current pose: {e}") + return + + self.enable_torque() + if not self._leader_torque_enabled: + return + + for step in range(steps + 1): + t = step / steps + interp = {} + for key, current_val in current.items(): + if key in target_pos: + interp[key] = current_val * (1.0 - t) + float(target_pos[key]) * t + else: + interp[key] = current_val + self.write_goal_positions(interp) + time.sleep(1.0 / fps) + @check_if_not_connected def get_action(self) -> RobotAction: # When the user has just toggled into intervention, make sure leader # torque is OFF so they can move it without fighting the position loop. if self._is_intervention and self._leader_torque_enabled: - try: - self.bus.sync_write("Torque_Enable", 0) - self._leader_torque_enabled = False - except Exception as e: # pragma: no cover - hardware path - logger.warning(f"[SOLeaderFollower] could not disable leader torque: {e}") + self.disable_torque() return super().get_action() def send_action(self, action: dict[str, float]) -> None: # type: ignore[override] @@ -193,9 +285,6 @@ class SOLeaderFollower(SOLeader): as the user toggles intervention on (SPACE), torque is dropped in :func:`get_action` so the human can move the leader freely. - Errors talking to the bus are logged and swallowed -- the policy / - learner loop must keep ticking even if a single haptic update fails. - Args: action: Dictionary of follower motor positions, ``{motor.pos: deg}``. """ @@ -215,20 +304,11 @@ class SOLeaderFollower(SOLeader): return if not self._leader_torque_enabled: - try: - self.bus.sync_write("Torque_Enable", 1) - self._leader_torque_enabled = True - except Exception as e: # pragma: no cover - hardware path - logger.warning(f"[SOLeaderFollower] could not enable leader torque: {e}") + self.enable_torque() + if not self._leader_torque_enabled: return - goal_pos = {m: float(action[f"{m}.pos"]) for m in self.bus.motors if f"{m}.pos" in action} - if not goal_pos: - return - try: - self.bus.sync_write("Goal_Position", goal_pos) - except Exception as e: # pragma: no cover - hardware path - logger.warning(f"[SOLeaderFollower] could not push goal position to leader: {e}") + self.write_goal_positions(action) def get_teleop_events(self) -> dict[TeleopEvents, bool]: events = {