diff --git a/lerobot/calibrate.py b/lerobot/calibrate.py index 6780577ff..a0d468e8e 100644 --- a/lerobot/calibrate.py +++ b/lerobot/calibrate.py @@ -41,6 +41,7 @@ from lerobot.common.robots import ( # noqa: F401 make_robot_from_config, so100_follower, so101_follower, + so101_follower_torque, ) from lerobot.common.teleoperators import ( # noqa: F401 Teleoperator, diff --git a/lerobot/common/motors/feetech/tables.py b/lerobot/common/motors/feetech/tables.py index 0a2f2659f..b6518945f 100644 --- a/lerobot/common/motors/feetech/tables.py +++ b/lerobot/common/motors/feetech/tables.py @@ -151,6 +151,32 @@ SCS_SERIES_CONTROL_TABLE = { "Acceleration_2": (83, 1), # don't know what that is } +HLS3625_CONTROL_TABLE = { + # EPROM --------------------------------------------------------- + "ID": (0x05, 1), + "Baud_Rate": (0x06, 1), + "Return_Delay_Time": (0x07, 1), + "Min_Position_Limit": (0x09, 2), + "Max_Position_Limit": (0x0B, 2), + "Operating_Mode": (0x21, 1), + "Homing_Offset": (0x1F, 2), + # SRAM control ----------------------------------------------- + "Torque_Enable": (0x28, 1), + "Acceleration": (0x29, 1), + "Phase": (0x12, 1), + "Target_Position": (0x2A, 2), + "Target_Torque": (0x2C, 2), + "Running_Speed": (0x2E, 2), + "Torque_Limit": (0x30, 2), + "Lock": (0x37, 1), + # SRAM feedback ---------------------------------------------- + "Present_Position": (0x38, 2), + "Present_Velocity": (0x3A, 2), + "Present_Current": (0x45, 2), + # Factory / limits ------------------------------------------- + "Maximum_Acceleration": (0x55, 1), +} + STS_SMS_SERIES_BAUDRATE_TABLE = { 1_000_000: 0, 500_000: 1, @@ -181,6 +207,7 @@ MODEL_CONTROL_TABLE = { "sts3250": STS_SMS_SERIES_CONTROL_TABLE, "scs0009": SCS_SERIES_CONTROL_TABLE, "sm8512bl": STS_SMS_SERIES_CONTROL_TABLE, + "hls3625": HLS3625_CONTROL_TABLE, } MODEL_RESOLUTION = { @@ -191,6 +218,7 @@ MODEL_RESOLUTION = { "sts3250": 4096, "sm8512bl": 65536, "scs0009": 1024, + "hls3625": 4096, } MODEL_BAUDRATE_TABLE = { @@ -201,6 +229,7 @@ MODEL_BAUDRATE_TABLE = { "sts3215": STS_SMS_SERIES_BAUDRATE_TABLE, "sts3250": STS_SMS_SERIES_BAUDRATE_TABLE, "scs0009": SCS_SERIES_BAUDRATE_TABLE, + "hls3625": STS_SMS_SERIES_BAUDRATE_TABLE, } # Sign-Magnitude encoding bits @@ -218,6 +247,7 @@ MODEL_ENCODING_TABLE = { "sts3250": STS_SMS_SERIES_ENCODINGS_TABLE, "sm8512bl": STS_SMS_SERIES_ENCODINGS_TABLE, "scs0009": {}, + "hls3625": {"Target_Torque": 15, "Homing_Offset": 15}, } SCAN_BAUDRATES = [ @@ -239,6 +269,7 @@ MODEL_NUMBER_TABLE = { "sts3250": 2825, "sm8512bl": 11272, "scs0009": 1284, + "hls3625": 3338, } MODEL_PROTOCOL = { @@ -249,4 +280,5 @@ MODEL_PROTOCOL = { "sts3250": 0, "sm8512bl": 0, "scs0009": 1, + "hls3625": 0, } diff --git a/lerobot/common/robots/so101_follower_torque/__init__.py b/lerobot/common/robots/so101_follower_torque/__init__.py new file mode 100644 index 000000000..f3bae0baf --- /dev/null +++ b/lerobot/common/robots/so101_follower_torque/__init__.py @@ -0,0 +1,2 @@ +from .config_so101_follower_t import SO101FollowerTConfig +from .so101_follower_t import SO101FollowerT diff --git a/lerobot/common/robots/so101_follower_torque/config_so101_follower_t.py b/lerobot/common/robots/so101_follower_torque/config_so101_follower_t.py new file mode 100644 index 000000000..7cffa6968 --- /dev/null +++ b/lerobot/common/robots/so101_follower_torque/config_so101_follower_t.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass, field + +from lerobot.common.cameras import CameraConfig + +from ..config import RobotConfig + + +@RobotConfig.register_subclass("so101_follower_t") +@dataclass +class SO101FollowerTConfig(RobotConfig): + # Port to connect to the arm + port: str + + disable_torque_on_disconnect: bool = True + + # `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes. + # Set this to a positive scalar to have the same value for all motors, or a list that is the same length as + # the number of motors in your follower arms. + max_relative_target: int | None = None + + # cameras + cameras: dict[str, CameraConfig] = field(default_factory=dict) diff --git a/lerobot/common/robots/so101_follower_torque/so101_follower_t.py b/lerobot/common/robots/so101_follower_torque/so101_follower_t.py new file mode 100644 index 000000000..22359f551 --- /dev/null +++ b/lerobot/common/robots/so101_follower_torque/so101_follower_t.py @@ -0,0 +1,213 @@ +#!/usr/bin/env python + +# Copyright 2025 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from functools import cached_property +from typing import Any + +from lerobot.common.cameras.utils import make_cameras_from_configs +from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.common.motors.feetech import ( + FeetechMotorsBus, + OperatingMode, +) + +from ..robot import Robot +from .config_so101_follower_t import SO101FollowerTConfig + +logger = logging.getLogger(__name__) + + +class SO101FollowerT(Robot): + """ + SO-101 Follower Arm designed by TheRobotStudio and Hugging Face. + """ + + config_class = SO101FollowerTConfig + name = "so101_follower" + + def __init__(self, config: SO101FollowerTConfig): + super().__init__(config) + self.config = config + self.bus = FeetechMotorsBus( + port=self.config.port, + motors={ + "shoulder_pan": Motor(1, "hls3625", MotorNormMode.RANGE_M100_100), + "shoulder_lift": Motor(2, "hls3625", MotorNormMode.RANGE_M100_100), + "elbow_flex": Motor(3, "hls3625", MotorNormMode.RANGE_M100_100), + "wrist_flex": Motor(4, "hls3625", MotorNormMode.RANGE_M100_100), + "wrist_roll": Motor(5, "hls3625", MotorNormMode.RANGE_M100_100), + "gripper": Motor(6, "hls3625", MotorNormMode.RANGE_0_100), + }, + calibration=self.calibration, + ) + self.cameras = make_cameras_from_configs(config.cameras) + + @property + def _motors_ft(self) -> dict[str, type]: + return {f"{motor}.pos": float for motor in self.bus.motors} + + @property + def _cameras_ft(self) -> dict[str, tuple]: + return { + cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras + } + + @cached_property + def observation_features(self) -> dict[str, type | tuple]: + return {**self._motors_ft, **self._cameras_ft} + + @cached_property + def action_features(self) -> dict[str, type]: + return { + **{f"{m}.pos": float for m in self.bus.motors}, + **{f"{m}.effort": int for m in self.bus.motors}, + } + + @property + def is_connected(self) -> bool: + return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + + def connect(self, calibrate: bool = True) -> None: + """ + We assume that at connection time, arm is in a rest position, + and torque can be safely disabled to run calibration. + """ + if self.is_connected: + raise DeviceAlreadyConnectedError(f"{self} already connected") + + self.bus.connect() + if not self.is_calibrated and calibrate: + self.calibrate() + + for cam in self.cameras.values(): + cam.connect() + + self.configure() + logger.info(f"{self} connected.") + + @property + def is_calibrated(self) -> bool: + return self.bus.is_calibrated + + def calibrate(self) -> None: + logger.info(f"\nRunning calibration of {self}") + self.bus.disable_torque() + for motor in self.bus.motors: + self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value) + + input(f"Move {self} to the middle of its range of motion and press ENTER....") + homing_offsets = self.bus.set_half_turn_homings() + + print( + "Move all joints sequentially through their entire ranges " + "of motion.\nRecording positions. Press ENTER to stop..." + ) + range_mins, range_maxes = self.bus.record_ranges_of_motion() + + self.calibration = {} + for motor, m in self.bus.motors.items(): + self.calibration[motor] = MotorCalibration( + id=m.id, + drive_mode=0, + homing_offset=homing_offsets[motor], + range_min=range_mins[motor], + range_max=range_maxes[motor], + ) + + self.bus.write_calibration(self.calibration) + self._save_calibration() + print("Calibration saved to", self.calibration_fpath) + + def configure(self) -> None: + with self.bus.torque_disabled(): + self.bus.configure_motors() + for motor in self.bus.motors: + phase = self.bus.read("Phase", motor, normalize=False) + if phase & 0x10: # bit-4 set = multi-turn + new_phase = phase & ~0x10 + print(f"Switching {motor} to single-turn: 0x{phase:02X} → 0x{new_phase:02X}") + self.bus.write("Phase", motor, new_phase, normalize=False) + + self.bus.write("Operating_Mode", motor, 2) # Set to current mode + self.bus.write("Target_Torque", motor, 0) + self.bus.write("Torque_Limit", motor, 1000) + + def setup_motors(self) -> None: + for motor in reversed(self.bus.motors): + input(f"Connect the controller board to the '{motor}' motor only and press enter.") + self.bus.setup_motor(motor) + print(f"'{motor}' motor id set to {self.bus.motors[motor].id}") + + def get_observation(self) -> dict[str, Any]: + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Read arm position + start = time.perf_counter() + + pos = self.bus.sync_read("Present_Position", normalize=False, num_retry=5) + curr = self.bus.sync_read("Present_Current", normalize=False, num_retry=5) + pos_dict = {f"{motor}.pos": val for motor, val in pos.items()} + curr_dict = {f"{motor}.effort": val for motor, val in curr.items()} + obs_dict = {**pos_dict, **curr_dict} + + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read state: {dt_ms:.1f}ms") + + # Capture images from cameras + for cam_key, cam in self.cameras.items(): + start = time.perf_counter() + obs_dict[cam_key] = cam.async_read() + dt_ms = (time.perf_counter() - start) * 1e3 + logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms") + + return obs_dict + + def send_action(self, action: dict[str, Any]) -> dict[str, Any]: + """Command arm to move to a target joint configuration. + + The relative action magnitude may be clipped depending on the configuration parameter + `max_relative_target`. In this case, the action sent differs from original action. + Thus, this function always returns the action actually sent. + + Raises: + RobotDeviceNotConnectedError: if robot is not connected. + + Returns: + the action sent to the motors, potentially clipped. + """ + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + # Send goal position to the arm + effort = {k.removesuffix(".effort"): int(v) for k, v in action.items() if k.endswith(".effort")} + if effort: + # current (torque) mode write + self.bus.sync_write("Target_Torque", effort, normalize=False, num_retry=2) + return action + + def disconnect(self): + if not self.is_connected: + raise DeviceNotConnectedError(f"{self} is not connected.") + + self.bus.disconnect(self.config.disable_torque_on_disconnect) + for cam in self.cameras.values(): + cam.disconnect() + + logger.info(f"{self} disconnected.") diff --git a/lerobot/common/robots/utils.py b/lerobot/common/robots/utils.py index d100c8366..e111d792e 100644 --- a/lerobot/common/robots/utils.py +++ b/lerobot/common/robots/utils.py @@ -33,6 +33,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot: from .so101_follower import SO101Follower return SO101Follower(config) + elif config.type == "so101_follower_t": + from .so101_follower_torque import SO101FollowerT + + return SO101FollowerT(config) elif config.type == "lekiwi": from .lekiwi import LeKiwi diff --git a/lerobot/set_motors.py b/lerobot/set_motors.py new file mode 100644 index 000000000..0d4ea2d16 --- /dev/null +++ b/lerobot/set_motors.py @@ -0,0 +1,9 @@ +from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig +from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT + +config = SO101FollowerTConfig( + port="/dev/tty.usbmodem58760428721", + id="my_awesome_follower_arm", +) +follower = SO101FollowerT(config) +follower.setup_motors() diff --git a/lerobot/setup_motors.py b/lerobot/setup_motors.py index 7909dc68d..e48dac1e2 100644 --- a/lerobot/setup_motors.py +++ b/lerobot/setup_motors.py @@ -35,6 +35,7 @@ from .common.robots import ( # noqa: F401 make_robot_from_config, so100_follower, so101_follower, + so101_follower_torque, ) from .common.teleoperators import ( # noqa: F401 TeleoperatorConfig, @@ -52,6 +53,7 @@ COMPATIBLE_DEVICES = [ "so101_follower", "so101_leader", "lekiwi", + "so101_follower_t", ] diff --git a/lerobot/teleoperate_t.py b/lerobot/teleoperate_t.py new file mode 100644 index 000000000..6b91072ce --- /dev/null +++ b/lerobot/teleoperate_t.py @@ -0,0 +1,107 @@ +import math +import time + +from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig +from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT + +F_C = 0.2 # Nm, coulomb (static) +F_V = 0.05 # Nm·s rad⁻¹, residual viscous +Kt = 1.4043083213990768 # Nm/A +AMP_PER_LSB = 0.0065 # 6.5 mA/LSB +MAX_LSB = 2047 +Kp, Kd, K_local = 15, 0.5, 0.01 +DT = 0.002 # 500 Hz +MOTORS = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"] # "gripper" +TICKS_TO_RAD = 2.0 * math.pi / 4096.0 # Feetech HLS-3625: 4096 ticks = 1 rev + +STEP = 0 +SCALE_TRANSIENT = 0.2 +SCALE_NORMAL = 1.0 + + +def friction(omega: float) -> float: + return F_C * math.tanh(omega / 0.05) + F_V * omega + + +def nm_to_lsb(tau: float) -> int: + return int(max(-MAX_LSB, min(MAX_LSB, round((tau / Kt) / AMP_PER_LSB)))) + + +def ticks_to_rad(ticks: int) -> float: + return ticks * TICKS_TO_RAD + + +robot_config = SO101FollowerTConfig( + port="/dev/tty.usbmodem58760431551", + id="my_awesome_follower_arm", +) + +teleop_config = SO101FollowerTConfig( + port="/dev/tty.usbmodem58760428721", + id="my_awesome_leader_arm", +) + +robot = SO101FollowerT(robot_config) + +teleop_device = SO101FollowerT(teleop_config) + +robot.connect() +teleop_device.connect() + +print("Starting bilateral tele-op") +prev_l = dict.fromkeys(MOTORS, 0.0) +prev_f = dict.fromkeys(MOTORS, 0.0) + +last_print = time.time() +loops = 0 + +while True: + STEP += 1 + # first 1000 steps at 20%, then full effort + scale = SCALE_TRANSIENT if STEP <= 1000 else SCALE_NORMAL + + tic = time.perf_counter() + obs_l = robot.get_observation() + obs_f = teleop_device.get_observation() + + tgt_l, tgt_f = {}, {} + for j in MOTORS: + ticks_l = obs_l[f"{j}.pos"] + ticks_f = obs_f[f"{j}.pos"] + + th_l = ticks_to_rad(ticks_l) # → radians + th_f = ticks_to_rad(ticks_f) + + dth_l = (th_l - prev_l[j]) / DT + dth_f = (th_f - prev_f[j]) / DT + + prev_l[j], prev_f[j] = th_l, th_f + + tau_l = -(Kp * (th_f - th_l) + Kd * (dth_f - dth_l) - K_local * dth_l) - friction(dth_l) + tau_f = -(Kp * (th_l - th_f) + Kd * (dth_l - dth_f) - K_local * dth_f) - friction(dth_f) + + tau_l *= scale + tau_f *= scale + + tgt_l[f"{j}.effort"] = nm_to_lsb(tau_l) + tgt_f[f"{j}.effort"] = nm_to_lsb(tau_f) + + robot.send_action(tgt_l) + teleop_device.send_action(tgt_f) + + leader_str = " | ".join(f"{j}:{tgt_l[f'{j}.effort']:+5d}" for j in MOTORS) + follower_str = " | ".join(f"{j}:{tgt_f[f'{j}.effort']:+5d}" for j in MOTORS) + print(f"Leader: {leader_str}") + print(f"Follower: {follower_str}") + + loops += 1 + elapsed = time.perf_counter() - tic + to_sleep = DT - elapsed + if to_sleep > 0: + time.sleep(to_sleep) + + now = time.time() + if now - last_print >= 1.0: + print(f"{loops / (now - last_print):6.1f} Hz control loop") # e.g. "500.2 Hz" + loops = 0 + last_print = now