diff --git a/src/lerobot/teleoperators/so101_leader_double/__init__.py b/src/lerobot/teleoperators/so101_leader_double/__init__.py new file mode 100644 index 0000000..6c4eeb2 --- /dev/null +++ b/src/lerobot/teleoperators/so101_leader_double/__init__.py @@ -0,0 +1,2 @@ +from .config_so101_leader_double import SO101LeaderDoubleConfig +from .so101_leader_double import SO101LeaderDouble diff --git a/src/lerobot/teleoperators/so101_leader_double/config_so101_leader_double.py b/src/lerobot/teleoperators/so101_leader_double/config_so101_leader_double.py new file mode 100644 index 0000000..507fa92 --- /dev/null +++ b/src/lerobot/teleoperators/so101_leader_double/config_so101_leader_double.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass + +from ..config import TeleoperatorConfig + +@TeleoperatorConfig.register_subclass("so101_leader_double") +@dataclass +class SO101LeaderDoubleConfig(TeleoperatorConfig): + # Port to connect to the arm + port: str + + use_degrees: bool = False diff --git a/src/lerobot/teleoperators/so101_leader_double/so101_leader_double.py b/src/lerobot/teleoperators/so101_leader_double/so101_leader_double.py new file mode 100644 index 0000000..39e0956 --- /dev/null +++ b/src/lerobot/teleoperators/so101_leader_double/so101_leader_double.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from typing import Any + +from lerobot.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.feetech import FeetechMotorsBus, OperatingMode +from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError + +from ..teleoperator import Teleoperator +from .config_so101_leader_double import SO101LeaderDoubleConfig + +logger = logging.getLogger(__name__) + + +class SO101LeaderDouble(Teleoperator): + """ + SO-101 Leader Arm (double) designed by TheRobotStudio and Hugging Face. + Provides two arms (left and right) with the same motor layout as the + original leader, but each side is addressed with a `left_` / `right_` + prefix. + """ + + config_class = SO101LeaderDoubleConfig + name = "so101_leader_double" + + def __init__(self, config: SO101LeaderDoubleConfig): + super().__init__(config) + self.config = config + norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100 + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + # left arm + "left_shoulder_pan": Motor(1, "sts3215", norm_mode_body), + "left_shoulder_lift": Motor(2, "sts3215", norm_mode_body), + "left_elbow_flex": Motor(3, "sts3215", norm_mode_body), + "left_wrist_flex": Motor(4, "sts3215", norm_mode_body), + "left_wrist_roll": Motor(5, "sts3215", norm_mode_body), + "left_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), + # right arm + "right_shoulder_pan": Motor(7, "sts3215", norm_mode_body), + "right_shoulder_lift": Motor(8, "sts3215", norm_mode_body), + "right_elbow_flex": Motor(9, "sts3215", norm_mode_body), + "right_wrist_flex": Motor(10, "sts3215", norm_mode_body), + "right_wrist_roll": Motor(11, "sts3215", norm_mode_body), + "right_gripper": Motor(12, "sts3215", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + + @property + def action_features(self) -> dict[str, type]: + # expose all motor positions (left and right) + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def feedback_features(self) -> dict[str, type]: + # No feedback channels for the leader teleoperator + return {} + + @property + def is_connected(self) -> bool: + return self.bus.is_connected + + def connect(self, calibrate: bool = True) -> None: + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + self.bus.connect() + if not self.is_calibrated and calibrate: + logger.info( + "Mismatch between calibration values in the motor and the calibration file or no calibration file found" + ) + self.calibrate() + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + if self.calibration: + user_input = input( + f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: " + ) + if user_input.strip().lower() != "c": + logger.info(f"Writing calibration file associated with the id {self.id} to the motors") + self.bus.write_calibration(self.calibration) + return + + logger.info(f"\\nRunning calibration of {self}") + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings() + + print( + "Move all joints sequentially through their entire ranges " + "of motion.\\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion() + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print(f"Calibration saved to {self.calibration_fpath}") + + def configure(self) -> None: + self.bus.disable_torque() + self.bus.configure_motors() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + def setup_motors(self) -> None: + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_action(self) -> dict[str, float]: + start = time.perf_counter() + action = self.bus.sync_read("Present_Position") + action = {f"{motor}.pos": val for motor, val in action.items()} + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read action: {dt_ms:.1f}ms") + return action + + def send_feedback(self, feedback: dict[str, float]) -> None: + # Not implemented for the leader teleoperator + raise NotImplementedError + + def disconnect(self) -> None: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + self.bus.disconnect() + logger.info(f"{self} disconnected.")