First bi teleop so101

This commit is contained in:
Pepijn
2025-06-12 09:53:30 +02:00
parent b1386fd79e
commit e171fa788a
9 changed files with 408 additions and 0 deletions

View File

@@ -41,6 +41,7 @@ from lerobot.common.robots import ( # noqa: F401
make_robot_from_config,
so100_follower,
so101_follower,
so101_follower_torque,
)
from lerobot.common.teleoperators import ( # noqa: F401
Teleoperator,

View File

@@ -151,6 +151,32 @@ SCS_SERIES_CONTROL_TABLE = {
"Acceleration_2": (83, 1), # don't know what that is
}
HLS3625_CONTROL_TABLE = {
# EPROM ---------------------------------------------------------
"ID": (0x05, 1),
"Baud_Rate": (0x06, 1),
"Return_Delay_Time": (0x07, 1),
"Min_Position_Limit": (0x09, 2),
"Max_Position_Limit": (0x0B, 2),
"Operating_Mode": (0x21, 1),
"Homing_Offset": (0x1F, 2),
# SRAM control -----------------------------------------------
"Torque_Enable": (0x28, 1),
"Acceleration": (0x29, 1),
"Phase": (0x12, 1),
"Target_Position": (0x2A, 2),
"Target_Torque": (0x2C, 2),
"Running_Speed": (0x2E, 2),
"Torque_Limit": (0x30, 2),
"Lock": (0x37, 1),
# SRAM feedback ----------------------------------------------
"Present_Position": (0x38, 2),
"Present_Velocity": (0x3A, 2),
"Present_Current": (0x45, 2),
# Factory / limits -------------------------------------------
"Maximum_Acceleration": (0x55, 1),
}
STS_SMS_SERIES_BAUDRATE_TABLE = {
1_000_000: 0,
500_000: 1,
@@ -181,6 +207,7 @@ MODEL_CONTROL_TABLE = {
"sts3250": STS_SMS_SERIES_CONTROL_TABLE,
"scs0009": SCS_SERIES_CONTROL_TABLE,
"sm8512bl": STS_SMS_SERIES_CONTROL_TABLE,
"hls3625": HLS3625_CONTROL_TABLE,
}
MODEL_RESOLUTION = {
@@ -191,6 +218,7 @@ MODEL_RESOLUTION = {
"sts3250": 4096,
"sm8512bl": 65536,
"scs0009": 1024,
"hls3625": 4096,
}
MODEL_BAUDRATE_TABLE = {
@@ -201,6 +229,7 @@ MODEL_BAUDRATE_TABLE = {
"sts3215": STS_SMS_SERIES_BAUDRATE_TABLE,
"sts3250": STS_SMS_SERIES_BAUDRATE_TABLE,
"scs0009": SCS_SERIES_BAUDRATE_TABLE,
"hls3625": STS_SMS_SERIES_BAUDRATE_TABLE,
}
# Sign-Magnitude encoding bits
@@ -218,6 +247,7 @@ MODEL_ENCODING_TABLE = {
"sts3250": STS_SMS_SERIES_ENCODINGS_TABLE,
"sm8512bl": STS_SMS_SERIES_ENCODINGS_TABLE,
"scs0009": {},
"hls3625": {"Target_Torque": 15, "Homing_Offset": 15},
}
SCAN_BAUDRATES = [
@@ -239,6 +269,7 @@ MODEL_NUMBER_TABLE = {
"sts3250": 2825,
"sm8512bl": 11272,
"scs0009": 1284,
"hls3625": 3338,
}
MODEL_PROTOCOL = {
@@ -249,4 +280,5 @@ MODEL_PROTOCOL = {
"sts3250": 0,
"sm8512bl": 0,
"scs0009": 1,
"hls3625": 0,
}

View File

@@ -0,0 +1,2 @@
from .config_so101_follower_t import SO101FollowerTConfig
from .so101_follower_t import SO101FollowerT

View File

@@ -0,0 +1,38 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig
from ..config import RobotConfig
@RobotConfig.register_subclass("so101_follower_t")
@dataclass
class SO101FollowerTConfig(RobotConfig):
# Port to connect to the arm
port: str
disable_torque_on_disconnect: bool = True
# `max_relative_target` limits the magnitude of the relative positional target vector for safety purposes.
# Set this to a positive scalar to have the same value for all motors, or a list that is the same length as
# the number of motors in your follower arms.
max_relative_target: int | None = None
# cameras
cameras: dict[str, CameraConfig] = field(default_factory=dict)

View File

@@ -0,0 +1,213 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from ..robot import Robot
from .config_so101_follower_t import SO101FollowerTConfig
logger = logging.getLogger(__name__)
class SO101FollowerT(Robot):
"""
SO-101 Follower Arm designed by TheRobotStudio and Hugging Face.
"""
config_class = SO101FollowerTConfig
name = "so101_follower"
def __init__(self, config: SO101FollowerTConfig):
super().__init__(config)
self.config = config
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
"shoulder_pan": Motor(1, "hls3625", MotorNormMode.RANGE_M100_100),
"shoulder_lift": Motor(2, "hls3625", MotorNormMode.RANGE_M100_100),
"elbow_flex": Motor(3, "hls3625", MotorNormMode.RANGE_M100_100),
"wrist_flex": Motor(4, "hls3625", MotorNormMode.RANGE_M100_100),
"wrist_roll": Motor(5, "hls3625", MotorNormMode.RANGE_M100_100),
"gripper": Motor(6, "hls3625", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
self.cameras = make_cameras_from_configs(config.cameras)
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def _cameras_ft(self) -> dict[str, tuple]:
return {
cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.cameras
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
return {
**{f"{m}.pos": float for m in self.bus.motors},
**{f"{m}.effort": int for m in self.bus.motors},
}
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
and torque can be safely disabled to run calibration.
"""
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
self.calibrate()
for cam in self.cameras.values():
cam.connect()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
logger.info(f"\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
print(
"Move all joints sequentially through their entire ranges "
"of motion.\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion()
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print("Calibration saved to", self.calibration_fpath)
def configure(self) -> None:
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
phase = self.bus.read("Phase", motor, normalize=False)
if phase & 0x10: # bit-4 set = multi-turn
new_phase = phase & ~0x10
print(f"Switching {motor} to single-turn: 0x{phase:02X} → 0x{new_phase:02X}")
self.bus.write("Phase", motor, new_phase, normalize=False)
self.bus.write("Operating_Mode", motor, 2) # Set to current mode
self.bus.write("Target_Torque", motor, 0)
self.bus.write("Torque_Limit", motor, 1000)
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_observation(self) -> dict[str, Any]:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Read arm position
start = time.perf_counter()
pos = self.bus.sync_read("Present_Position", normalize=False, num_retry=5)
curr = self.bus.sync_read("Present_Current", normalize=False, num_retry=5)
pos_dict = {f"{motor}.pos": val for motor, val in pos.items()}
curr_dict = {f"{motor}.effort": val for motor, val in curr.items()}
obs_dict = {**pos_dict, **curr_dict}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Capture images from cameras
for cam_key, cam in self.cameras.items():
start = time.perf_counter()
obs_dict[cam_key] = cam.async_read()
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read {cam_key}: {dt_ms:.1f}ms")
return obs_dict
def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter
`max_relative_target`. In this case, the action sent differs from original action.
Thus, this function always returns the action actually sent.
Raises:
RobotDeviceNotConnectedError: if robot is not connected.
Returns:
the action sent to the motors, potentially clipped.
"""
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
# Send goal position to the arm
effort = {k.removesuffix(".effort"): int(v) for k, v in action.items() if k.endswith(".effort")}
if effort:
# current (torque) mode write
self.bus.sync_write("Target_Torque", effort, normalize=False, num_retry=2)
return action
def disconnect(self):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect(self.config.disable_torque_on_disconnect)
for cam in self.cameras.values():
cam.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -33,6 +33,10 @@ def make_robot_from_config(config: RobotConfig) -> Robot:
from .so101_follower import SO101Follower
return SO101Follower(config)
elif config.type == "so101_follower_t":
from .so101_follower_torque import SO101FollowerT
return SO101FollowerT(config)
elif config.type == "lekiwi":
from .lekiwi import LeKiwi

9
lerobot/set_motors.py Normal file
View File

@@ -0,0 +1,9 @@
from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
config = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760428721",
id="my_awesome_follower_arm",
)
follower = SO101FollowerT(config)
follower.setup_motors()

View File

@@ -35,6 +35,7 @@ from .common.robots import ( # noqa: F401
make_robot_from_config,
so100_follower,
so101_follower,
so101_follower_torque,
)
from .common.teleoperators import ( # noqa: F401
TeleoperatorConfig,
@@ -52,6 +53,7 @@ COMPATIBLE_DEVICES = [
"so101_follower",
"so101_leader",
"lekiwi",
"so101_follower_t",
]

107
lerobot/teleoperate_t.py Normal file
View File

@@ -0,0 +1,107 @@
import math
import time
from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
F_C = 0.2 # Nm, coulomb (static)
F_V = 0.05 # Nm·s rad⁻¹, residual viscous
Kt = 1.4043083213990768 # Nm/A
AMP_PER_LSB = 0.0065 # 6.5 mA/LSB
MAX_LSB = 2047
Kp, Kd, K_local = 15, 0.5, 0.01
DT = 0.002 # 500 Hz
MOTORS = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"] # "gripper"
TICKS_TO_RAD = 2.0 * math.pi / 4096.0 # Feetech HLS-3625: 4096 ticks = 1 rev
STEP = 0
SCALE_TRANSIENT = 0.2
SCALE_NORMAL = 1.0
def friction(omega: float) -> float:
return F_C * math.tanh(omega / 0.05) + F_V * omega
def nm_to_lsb(tau: float) -> int:
return int(max(-MAX_LSB, min(MAX_LSB, round((tau / Kt) / AMP_PER_LSB))))
def ticks_to_rad(ticks: int) -> float:
return ticks * TICKS_TO_RAD
robot_config = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760431551",
id="my_awesome_follower_arm",
)
teleop_config = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760428721",
id="my_awesome_leader_arm",
)
robot = SO101FollowerT(robot_config)
teleop_device = SO101FollowerT(teleop_config)
robot.connect()
teleop_device.connect()
print("Starting bilateral tele-op")
prev_l = dict.fromkeys(MOTORS, 0.0)
prev_f = dict.fromkeys(MOTORS, 0.0)
last_print = time.time()
loops = 0
while True:
STEP += 1
# first 1000 steps at 20%, then full effort
scale = SCALE_TRANSIENT if STEP <= 1000 else SCALE_NORMAL
tic = time.perf_counter()
obs_l = robot.get_observation()
obs_f = teleop_device.get_observation()
tgt_l, tgt_f = {}, {}
for j in MOTORS:
ticks_l = obs_l[f"{j}.pos"]
ticks_f = obs_f[f"{j}.pos"]
th_l = ticks_to_rad(ticks_l) # → radians
th_f = ticks_to_rad(ticks_f)
dth_l = (th_l - prev_l[j]) / DT
dth_f = (th_f - prev_f[j]) / DT
prev_l[j], prev_f[j] = th_l, th_f
tau_l = -(Kp * (th_f - th_l) + Kd * (dth_f - dth_l) - K_local * dth_l) - friction(dth_l)
tau_f = -(Kp * (th_l - th_f) + Kd * (dth_l - dth_f) - K_local * dth_f) - friction(dth_f)
tau_l *= scale
tau_f *= scale
tgt_l[f"{j}.effort"] = nm_to_lsb(tau_l)
tgt_f[f"{j}.effort"] = nm_to_lsb(tau_f)
robot.send_action(tgt_l)
teleop_device.send_action(tgt_f)
leader_str = " | ".join(f"{j}:{tgt_l[f'{j}.effort']:+5d}" for j in MOTORS)
follower_str = " | ".join(f"{j}:{tgt_f[f'{j}.effort']:+5d}" for j in MOTORS)
print(f"Leader: {leader_str}")
print(f"Follower: {follower_str}")
loops += 1
elapsed = time.perf_counter() - tic
to_sleep = DT - elapsed
if to_sleep > 0:
time.sleep(to_sleep)
now = time.time()
if now - last_print >= 1.0:
print(f"{loops / (now - last_print):6.1f} Hz control loop") # e.g. "500.2 Hz"
loops = 0
last_print = now