mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-04 21:01:26 +00:00
Use multi turn, single turn is problem!
This commit is contained in:
@@ -231,82 +231,58 @@ class FeetechMotorsBus(MotorsBus):
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
motors_calibration = self.read_calibration()
|
||||
if set(motors_calibration) != set(self.calibration):
|
||||
return False
|
||||
|
||||
same_ranges = all(
|
||||
self.calibration[motor].range_min == cal.range_min
|
||||
and self.calibration[motor].range_max == cal.range_max
|
||||
for motor, cal in motors_calibration.items()
|
||||
)
|
||||
if self.protocol_version == 1:
|
||||
return same_ranges
|
||||
|
||||
same_offsets = all(
|
||||
self.calibration[motor].homing_offset == cal.homing_offset
|
||||
for motor, cal in motors_calibration.items()
|
||||
)
|
||||
return same_ranges and same_offsets
|
||||
# Check if calibration data has been loaded from file
|
||||
return bool(self.calibration)
|
||||
|
||||
def read_calibration(self) -> dict[str, MotorCalibration]:
|
||||
offsets, mins, maxes = {}, {}, {}
|
||||
for motor in self.motors:
|
||||
mins[motor] = self.read("Min_Position_Limit", motor, normalize=False)
|
||||
maxes[motor] = self.read("Max_Position_Limit", motor, normalize=False)
|
||||
offsets[motor] = (
|
||||
self.read("Homing_Offset", motor, normalize=False) if self.protocol_version == 0 else 0
|
||||
)
|
||||
|
||||
# Return empty calibration - we don't read from motors anymore
|
||||
calibration = {}
|
||||
for motor, m in self.motors.items():
|
||||
calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=offsets[motor],
|
||||
range_min=mins[motor],
|
||||
range_max=maxes[motor],
|
||||
homing_offset=0,
|
||||
range_min=0,
|
||||
range_max=4095, # Default max resolution
|
||||
)
|
||||
|
||||
return calibration
|
||||
|
||||
def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None:
|
||||
for motor, calibration in calibration_dict.items():
|
||||
if self.protocol_version == 0:
|
||||
self.write("Homing_Offset", motor, calibration.homing_offset)
|
||||
self.write("Min_Position_Limit", motor, calibration.range_min)
|
||||
self.write("Max_Position_Limit", motor, calibration.range_max)
|
||||
|
||||
# Only update the in-memory calibration, don't write to motors
|
||||
self.calibration = calibration_dict
|
||||
|
||||
def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]:
|
||||
"""
|
||||
On Feetech Motors:
|
||||
Present_Position = Actual_Position - Homing_Offset
|
||||
Calculate homing offsets such that the current position becomes 0 degrees.
|
||||
|
||||
For Feetech motors:
|
||||
- The homing offset is subtracted from the raw position during normalization
|
||||
- So to make current position = 0 degrees, homing_offset = current_raw_position
|
||||
"""
|
||||
half_turn_homings = {}
|
||||
for motor, pos in positions.items():
|
||||
model = self._get_motor_model(motor)
|
||||
max_res = self.model_resolution_table[model] - 1
|
||||
half_turn_homings[motor] = pos - int(max_res / 2)
|
||||
# The homing offset should be the current position
|
||||
# This way, when we normalize: (pos - homing_offset) = 0
|
||||
half_turn_homings[motor] = pos
|
||||
|
||||
return half_turn_homings
|
||||
|
||||
def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry)
|
||||
self.write("Lock", motor, 0, num_retry=num_retry)
|
||||
# self.write("Lock", motor, 0, num_retry=num_retry)
|
||||
|
||||
def _disable_torque(self, motor_id: int, model: str, num_retry: int = 0) -> None:
|
||||
def _disable_torque(self, motor_id: int, model: str, num_retry: int = 5) -> None:
|
||||
addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable")
|
||||
self._write(addr, length, motor_id, TorqueMode.DISABLED.value, num_retry=num_retry)
|
||||
addr, length = get_address(self.model_ctrl_table, model, "Lock")
|
||||
self._write(addr, length, motor_id, 0, num_retry=num_retry)
|
||||
# addr, length = get_address(self.model_ctrl_table, model, "Lock")
|
||||
# self._write(addr, length, motor_id, 0, num_retry=num_retry)
|
||||
|
||||
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None:
|
||||
def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 5) -> None:
|
||||
for motor in self._get_motors_list(motors):
|
||||
self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry)
|
||||
self.write("Lock", motor, 1, num_retry=num_retry)
|
||||
# self.write("Lock", motor, 1, num_retry=num_retry)
|
||||
|
||||
def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]:
|
||||
for id_ in ids_values:
|
||||
|
||||
@@ -713,9 +713,8 @@ class MotorsBus(abc.ABC):
|
||||
self.reset_calibration(motors)
|
||||
actual_positions = self.sync_read("Present_Position", motors, normalize=False)
|
||||
homing_offsets = self._get_half_turn_homings(actual_positions)
|
||||
for motor, offset in homing_offsets.items():
|
||||
self.write("Homing_Offset", motor, offset)
|
||||
|
||||
# Don't write to motors, just return the calculated offsets
|
||||
return homing_offsets
|
||||
|
||||
@abc.abstractmethod
|
||||
@@ -784,21 +783,32 @@ class MotorsBus(abc.ABC):
|
||||
motor = self._id_to_name(id_)
|
||||
min_ = self.calibration[motor].range_min
|
||||
max_ = self.calibration[motor].range_max
|
||||
homing_offset = self.calibration[motor].homing_offset
|
||||
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
|
||||
|
||||
if max_ == min_:
|
||||
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
|
||||
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100:
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100
|
||||
normalized_values[id_] = -norm if drive_mode else norm
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100:
|
||||
bounded_val = min(max_, max(min_, val))
|
||||
norm = ((bounded_val - min_) / (max_ - min_)) * 100
|
||||
normalized_values[id_] = 100 - norm if drive_mode else norm
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
|
||||
deg = bounded_val * COUNT_TO_DEG
|
||||
deg_zero = deg - 180 # Subtract 180 degrees because 2047 (raw count) should be 0 degrees
|
||||
normalized_values[id_] = -deg_zero if drive_mode else deg_zero
|
||||
# For motors without wrap-around handling
|
||||
# The homing offset becomes 0 degrees
|
||||
|
||||
# Calculate difference from homing position
|
||||
diff = val - homing_offset
|
||||
|
||||
# Convert to degrees
|
||||
deg = diff * COUNT_TO_DEG
|
||||
|
||||
# Apply drive mode if needed
|
||||
normalized_values[id_] = -deg if drive_mode else deg
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -813,7 +823,9 @@ class MotorsBus(abc.ABC):
|
||||
motor = self._id_to_name(id_)
|
||||
min_ = self.calibration[motor].range_min
|
||||
max_ = self.calibration[motor].range_max
|
||||
homing_offset = self.calibration[motor].homing_offset
|
||||
drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode
|
||||
|
||||
if max_ == min_:
|
||||
raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.")
|
||||
|
||||
@@ -826,10 +838,22 @@ class MotorsBus(abc.ABC):
|
||||
bounded_val = min(100.0, max(0.0, val))
|
||||
unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_)
|
||||
elif self.motors[motor].norm_mode is MotorNormMode.DEGREES:
|
||||
# For motors without wrap-around, simple conversion back
|
||||
# Apply drive mode if needed
|
||||
val = -val if drive_mode else val
|
||||
deg = val + 180 # Add 180 degrees because 0 degrees should be 2047 (raw count)
|
||||
raw_counts = int(round(deg / COUNT_TO_DEG))
|
||||
unnormalized_values[id_] = raw_counts
|
||||
|
||||
# Convert degrees to raw counts
|
||||
raw_counts = int(round(val / COUNT_TO_DEG))
|
||||
|
||||
# Add back the homing offset
|
||||
raw_counts_with_offset = raw_counts + homing_offset
|
||||
|
||||
# Ensure value stays within calibrated motor range
|
||||
# Use the calibration min/max if available
|
||||
if min_ is not None and max_ is not None:
|
||||
raw_counts_with_offset = max(min_, min(max_, raw_counts_with_offset))
|
||||
|
||||
unnormalized_values[id_] = raw_counts_with_offset
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -46,13 +46,13 @@ class SO101FollowerT(Robot):
|
||||
|
||||
_CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
|
||||
_KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html
|
||||
_MAX_CURRENT_A: float = 2.0 # Safe driver limit for this model
|
||||
_MAX_CURRENT_A: float = 3.0 # Safe driver limit for this model
|
||||
|
||||
# Control gains for bilateral teleoperation
|
||||
_KP_GAINS = { # Position gains [Nm/rad] - reduced for bilateral stability
|
||||
"shoulder_pan": 5.0, # Reduced from 7.0
|
||||
"shoulder_lift": 10.0, # Reduced from 15.0
|
||||
"elbow_flex": 8.0, # Reduced from 12.0 (main problem joint)
|
||||
"shoulder_lift": 15.0, # Reduced from 15.0
|
||||
"elbow_flex": 12.0, # Reduced from 12.0 (main problem joint)
|
||||
"wrist_flex": 4.0, # Reduced from 6.0
|
||||
"wrist_roll": 3.0, # Reduced from 4.0
|
||||
"gripper": 1.5, # Reduced from 2.0
|
||||
@@ -89,6 +89,11 @@ class SO101FollowerT(Robot):
|
||||
def __init__(self, config: SO101FollowerTConfig):
|
||||
super().__init__(config)
|
||||
self.config = config
|
||||
|
||||
# Ensure calibration is loaded before creating the bus
|
||||
if self.calibration_fpath.is_file() and not self.calibration:
|
||||
self._load_calibration()
|
||||
|
||||
self.bus = FeetechMotorsBus(
|
||||
port=self.config.port,
|
||||
motors={
|
||||
@@ -280,6 +285,12 @@ class SO101FollowerT(Robot):
|
||||
if self.is_connected:
|
||||
raise DeviceAlreadyConnectedError(f"{self} already connected")
|
||||
|
||||
# Ensure calibration is loaded from file if it exists
|
||||
if self.calibration_fpath.is_file() and not self.calibration:
|
||||
self._load_calibration()
|
||||
# Update the bus with the loaded calibration
|
||||
self.bus.calibration = self.calibration
|
||||
|
||||
self.bus.connect()
|
||||
if not self.is_calibrated and calibrate:
|
||||
self.calibrate()
|
||||
@@ -292,7 +303,8 @@ class SO101FollowerT(Robot):
|
||||
|
||||
@property
|
||||
def is_calibrated(self) -> bool:
|
||||
return self.bus.is_calibrated
|
||||
# Check if calibration file exists and is loaded
|
||||
return self.calibration_fpath.is_file() and bool(self.calibration)
|
||||
|
||||
def calibrate(self) -> None:
|
||||
logger.info(f"\nRunning calibration of {self}")
|
||||
@@ -314,12 +326,14 @@ class SO101FollowerT(Robot):
|
||||
self.calibration[motor] = MotorCalibration(
|
||||
id=m.id,
|
||||
drive_mode=0,
|
||||
homing_offset=homing_offsets[motor],
|
||||
range_min=range_mins[motor],
|
||||
range_max=range_maxes[motor],
|
||||
homing_offset=int(homing_offsets[motor]),
|
||||
range_min=int(range_mins[motor]),
|
||||
range_max=int(range_maxes[motor]),
|
||||
)
|
||||
|
||||
self.bus.write_calibration(self.calibration)
|
||||
# Update the bus calibration with the new values
|
||||
self.bus.calibration = self.calibration
|
||||
# Save calibration to file only
|
||||
self._save_calibration()
|
||||
print("Calibration saved to", self.calibration_fpath)
|
||||
|
||||
@@ -330,7 +344,6 @@ class SO101FollowerT(Robot):
|
||||
self.bus.write("Operating_Mode", motor, 2, num_retry=2) # Set to current mode
|
||||
self.bus.write("Torque_Limit", motor, 1000, num_retry=2) # 100%
|
||||
self.bus.write("Max_Torque_Limit", motor, 1000, num_retry=2) # 100%
|
||||
self.bus.write("Protection_Current", motor, 1000, num_retry=2)
|
||||
|
||||
def setup_motors(self) -> None:
|
||||
for motor in reversed(self.bus.motors):
|
||||
@@ -409,6 +422,10 @@ class SO101FollowerT(Robot):
|
||||
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
|
||||
counts = {}
|
||||
for joint, τ in tau_cmd_nm.items():
|
||||
# Set wrist_flex commands to 0
|
||||
if joint == "wrist_flex":
|
||||
counts[joint] = 0
|
||||
continue
|
||||
cnt = τ * self.torque_sign[joint] * inv_coef # flip SIGN
|
||||
cnt = max(-max_cnt, min(max_cnt, cnt))
|
||||
counts[joint] = int(round(cnt))
|
||||
|
||||
Reference in New Issue
Block a user