new file: src/lerobot/teleoperators/so101_leader_double/__init__.py

new file:   src/lerobot/teleoperators/so101_leader_double/config_so101_leader_double.py
	new file:   src/lerobot/teleoperators/so101_leader_double/so101_leader_double.py
This commit is contained in:
2025-11-30 13:47:46 +08:00
parent 5b5767dae0
commit 64cef98fba
3 changed files with 192 additions and 0 deletions

View File

@@ -0,0 +1,2 @@
from .config_so101_leader_double import SO101LeaderDoubleConfig
from .so101_leader_double import SO101LeaderDouble

View File

@@ -0,0 +1,27 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("so101_leader_double")
@dataclass
class SO101LeaderDoubleConfig(TeleoperatorConfig):
# Port to connect to the arm
port: str
use_degrees: bool = False

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.feetech import FeetechMotorsBus, OperatingMode
from lerobot.utils.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from ..teleoperator import Teleoperator
from .config_so101_leader_double import SO101LeaderDoubleConfig
logger = logging.getLogger(__name__)
class SO101LeaderDouble(Teleoperator):
"""
SO-101 Leader Arm (double) designed by TheRobotStudio and Hugging Face.
Provides two arms (left and right) with the same motor layout as the
original leader, but each side is addressed with a `left_` / `right_`
prefix.
"""
config_class = SO101LeaderDoubleConfig
name = "so101_leader_double"
def __init__(self, config: SO101LeaderDoubleConfig):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES if config.use_degrees else MotorNormMode.RANGE_M100_100
self.bus = FeetechMotorsBus(
port=self.config.port,
motors={
# left arm
"left_shoulder_pan": Motor(1, "sts3215", norm_mode_body),
"left_shoulder_lift": Motor(2, "sts3215", norm_mode_body),
"left_elbow_flex": Motor(3, "sts3215", norm_mode_body),
"left_wrist_flex": Motor(4, "sts3215", norm_mode_body),
"left_wrist_roll": Motor(5, "sts3215", norm_mode_body),
"left_gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100),
# right arm
"right_shoulder_pan": Motor(7, "sts3215", norm_mode_body),
"right_shoulder_lift": Motor(8, "sts3215", norm_mode_body),
"right_elbow_flex": Motor(9, "sts3215", norm_mode_body),
"right_wrist_flex": Motor(10, "sts3215", norm_mode_body),
"right_wrist_roll": Motor(11, "sts3215", norm_mode_body),
"right_gripper": Motor(12, "sts3215", MotorNormMode.RANGE_0_100),
},
calibration=self.calibration,
)
@property
def action_features(self) -> dict[str, type]:
# expose all motor positions (left and right)
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
# No feedback channels for the leader teleoperator
return {}
@property
def is_connected(self) -> bool:
return self.bus.is_connected
def connect(self, calibrate: bool = True) -> None:
if self.is_connected:
raise DeviceAlreadyConnectedError(f"{self} already connected")
self.bus.connect()
if not self.is_calibrated and calibrate:
logger.info(
"Mismatch between calibration values in the motor and the calibration file or no calibration file found"
)
self.calibrate()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus.is_calibrated
def calibrate(self) -> None:
if self.calibration:
user_input = input(
f"Press ENTER to use provided calibration file associated with the id {self.id}, or type 'c' and press ENTER to run calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Writing calibration file associated with the id {self.id} to the motors")
self.bus.write_calibration(self.calibration)
return
logger.info(f"\\nRunning calibration of {self}")
self.bus.disable_torque()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(f"Move {self} to the middle of its range of motion and press ENTER....")
homing_offsets = self.bus.set_half_turn_homings()
print(
"Move all joints sequentially through their entire ranges "
"of motion.\\nRecording positions. Press ENTER to stop..."
)
range_mins, range_maxes = self.bus.record_ranges_of_motion()
self.calibration = {}
for motor, m in self.bus.motors.items():
self.calibration[motor] = MotorCalibration(
id=m.id,
drive_mode=0,
homing_offset=homing_offsets[motor],
range_min=range_mins[motor],
range_max=range_maxes[motor],
)
self.bus.write_calibration(self.calibration)
self._save_calibration()
print(f"Calibration saved to {self.calibration_fpath}")
def configure(self) -> None:
self.bus.disable_torque()
self.bus.configure_motors()
for motor in self.bus.motors:
self.bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
input(f"Connect the controller board to the '{motor}' motor only and press enter.")
self.bus.setup_motor(motor)
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
def get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
# Not implemented for the leader teleoperator
raise NotImplementedError
def disconnect(self) -> None:
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
self.bus.disconnect()
logger.info(f"{self} disconnected.")