diff --git a/src/lerobot/motors/feetech/feetech.py b/src/lerobot/motors/feetech/feetech.py index fde0b0aec..bfd4edd55 100644 --- a/src/lerobot/motors/feetech/feetech.py +++ b/src/lerobot/motors/feetech/feetech.py @@ -231,82 +231,58 @@ class FeetechMotorsBus(MotorsBus): @property def is_calibrated(self) -> bool: - motors_calibration = self.read_calibration() - if set(motors_calibration) != set(self.calibration): - return False - - same_ranges = all( - self.calibration[motor].range_min == cal.range_min - and self.calibration[motor].range_max == cal.range_max - for motor, cal in motors_calibration.items() - ) - if self.protocol_version == 1: - return same_ranges - - same_offsets = all( - self.calibration[motor].homing_offset == cal.homing_offset - for motor, cal in motors_calibration.items() - ) - return same_ranges and same_offsets + # Check if calibration data has been loaded from file + return bool(self.calibration) def read_calibration(self) -> dict[str, MotorCalibration]: - offsets, mins, maxes = {}, {}, {} - for motor in self.motors: - mins[motor] = self.read("Min_Position_Limit", motor, normalize=False) - maxes[motor] = self.read("Max_Position_Limit", motor, normalize=False) - offsets[motor] = ( - self.read("Homing_Offset", motor, normalize=False) if self.protocol_version == 0 else 0 - ) - + # Return empty calibration - we don't read from motors anymore calibration = {} for motor, m in self.motors.items(): calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, - homing_offset=offsets[motor], - range_min=mins[motor], - range_max=maxes[motor], + homing_offset=0, + range_min=0, + range_max=4095, # Default max resolution ) return calibration def write_calibration(self, calibration_dict: dict[str, MotorCalibration]) -> None: - for motor, calibration in calibration_dict.items(): - if self.protocol_version == 0: - self.write("Homing_Offset", motor, calibration.homing_offset) - self.write("Min_Position_Limit", motor, calibration.range_min) - self.write("Max_Position_Limit", motor, calibration.range_max) - + # Only update the in-memory calibration, don't write to motors self.calibration = calibration_dict def _get_half_turn_homings(self, positions: dict[NameOrID, Value]) -> dict[NameOrID, Value]: """ - On Feetech Motors: - Present_Position = Actual_Position - Homing_Offset + Calculate homing offsets such that the current position becomes 0 degrees. + + For Feetech motors: + - The homing offset is subtracted from the raw position during normalization + - So to make current position = 0 degrees, homing_offset = current_raw_position """ half_turn_homings = {} for motor, pos in positions.items(): - model = self._get_motor_model(motor) - max_res = self.model_resolution_table[model] - 1 - half_turn_homings[motor] = pos - int(max_res / 2) + # The homing offset should be the current position + # This way, when we normalize: (pos - homing_offset) = 0 + half_turn_homings[motor] = pos return half_turn_homings def disable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: for motor in self._get_motors_list(motors): self.write("Torque_Enable", motor, TorqueMode.DISABLED.value, num_retry=num_retry) - self.write("Lock", motor, 0, num_retry=num_retry) + # self.write("Lock", motor, 0, num_retry=num_retry) - def _disable_torque(self, motor_id: int, model: str, num_retry: int = 0) -> None: + def _disable_torque(self, motor_id: int, model: str, num_retry: int = 5) -> None: addr, length = get_address(self.model_ctrl_table, model, "Torque_Enable") self._write(addr, length, motor_id, TorqueMode.DISABLED.value, num_retry=num_retry) - addr, length = get_address(self.model_ctrl_table, model, "Lock") - self._write(addr, length, motor_id, 0, num_retry=num_retry) + # addr, length = get_address(self.model_ctrl_table, model, "Lock") + # self._write(addr, length, motor_id, 0, num_retry=num_retry) - def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 0) -> None: + def enable_torque(self, motors: str | list[str] | None = None, num_retry: int = 5) -> None: for motor in self._get_motors_list(motors): self.write("Torque_Enable", motor, TorqueMode.ENABLED.value, num_retry=num_retry) - self.write("Lock", motor, 1, num_retry=num_retry) + # self.write("Lock", motor, 1, num_retry=num_retry) def _encode_sign(self, data_name: str, ids_values: dict[int, int]) -> dict[int, int]: for id_ in ids_values: diff --git a/src/lerobot/motors/motors_bus.py b/src/lerobot/motors/motors_bus.py index fdaa17432..ceffd6005 100644 --- a/src/lerobot/motors/motors_bus.py +++ b/src/lerobot/motors/motors_bus.py @@ -713,9 +713,8 @@ class MotorsBus(abc.ABC): self.reset_calibration(motors) actual_positions = self.sync_read("Present_Position", motors, normalize=False) homing_offsets = self._get_half_turn_homings(actual_positions) - for motor, offset in homing_offsets.items(): - self.write("Homing_Offset", motor, offset) + # Don't write to motors, just return the calculated offsets return homing_offsets @abc.abstractmethod @@ -784,21 +783,32 @@ class MotorsBus(abc.ABC): motor = self._id_to_name(id_) min_ = self.calibration[motor].range_min max_ = self.calibration[motor].range_max + homing_offset = self.calibration[motor].homing_offset drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode + if max_ == min_: raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") - bounded_val = min(max_, max(min_, val)) if self.motors[motor].norm_mode is MotorNormMode.RANGE_M100_100: + bounded_val = min(max_, max(min_, val)) norm = (((bounded_val - min_) / (max_ - min_)) * 200) - 100 normalized_values[id_] = -norm if drive_mode else norm elif self.motors[motor].norm_mode is MotorNormMode.RANGE_0_100: + bounded_val = min(max_, max(min_, val)) norm = ((bounded_val - min_) / (max_ - min_)) * 100 normalized_values[id_] = 100 - norm if drive_mode else norm elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: - deg = bounded_val * COUNT_TO_DEG - deg_zero = deg - 180 # Subtract 180 degrees because 2047 (raw count) should be 0 degrees - normalized_values[id_] = -deg_zero if drive_mode else deg_zero + # For motors without wrap-around handling + # The homing offset becomes 0 degrees + + # Calculate difference from homing position + diff = val - homing_offset + + # Convert to degrees + deg = diff * COUNT_TO_DEG + + # Apply drive mode if needed + normalized_values[id_] = -deg if drive_mode else deg else: raise NotImplementedError @@ -813,7 +823,9 @@ class MotorsBus(abc.ABC): motor = self._id_to_name(id_) min_ = self.calibration[motor].range_min max_ = self.calibration[motor].range_max + homing_offset = self.calibration[motor].homing_offset drive_mode = self.apply_drive_mode and self.calibration[motor].drive_mode + if max_ == min_: raise ValueError(f"Invalid calibration for motor '{motor}': min and max are equal.") @@ -826,10 +838,22 @@ class MotorsBus(abc.ABC): bounded_val = min(100.0, max(0.0, val)) unnormalized_values[id_] = int((bounded_val / 100) * (max_ - min_) + min_) elif self.motors[motor].norm_mode is MotorNormMode.DEGREES: + # For motors without wrap-around, simple conversion back + # Apply drive mode if needed val = -val if drive_mode else val - deg = val + 180 # Add 180 degrees because 0 degrees should be 2047 (raw count) - raw_counts = int(round(deg / COUNT_TO_DEG)) - unnormalized_values[id_] = raw_counts + + # Convert degrees to raw counts + raw_counts = int(round(val / COUNT_TO_DEG)) + + # Add back the homing offset + raw_counts_with_offset = raw_counts + homing_offset + + # Ensure value stays within calibrated motor range + # Use the calibration min/max if available + if min_ is not None and max_ is not None: + raw_counts_with_offset = max(min_, min(max_, raw_counts_with_offset)) + + unnormalized_values[id_] = raw_counts_with_offset else: raise NotImplementedError diff --git a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py index bb87e39be..d44aea173 100644 --- a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py +++ b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py @@ -46,13 +46,13 @@ class SO101FollowerT(Robot): _CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0 _KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html - _MAX_CURRENT_A: float = 2.0 # Safe driver limit for this model + _MAX_CURRENT_A: float = 3.0 # Safe driver limit for this model # Control gains for bilateral teleoperation _KP_GAINS = { # Position gains [Nm/rad] - reduced for bilateral stability "shoulder_pan": 5.0, # Reduced from 7.0 - "shoulder_lift": 10.0, # Reduced from 15.0 - "elbow_flex": 8.0, # Reduced from 12.0 (main problem joint) + "shoulder_lift": 15.0, # Reduced from 15.0 + "elbow_flex": 12.0, # Reduced from 12.0 (main problem joint) "wrist_flex": 4.0, # Reduced from 6.0 "wrist_roll": 3.0, # Reduced from 4.0 "gripper": 1.5, # Reduced from 2.0 @@ -89,6 +89,11 @@ class SO101FollowerT(Robot): def __init__(self, config: SO101FollowerTConfig): super().__init__(config) self.config = config + + # Ensure calibration is loaded before creating the bus + if self.calibration_fpath.is_file() and not self.calibration: + self._load_calibration() + self.bus = FeetechMotorsBus( port=self.config.port, motors={ @@ -280,6 +285,12 @@ class SO101FollowerT(Robot): if self.is_connected: raise DeviceAlreadyConnectedError(f"{self} already connected") + # Ensure calibration is loaded from file if it exists + if self.calibration_fpath.is_file() and not self.calibration: + self._load_calibration() + # Update the bus with the loaded calibration + self.bus.calibration = self.calibration + self.bus.connect() if not self.is_calibrated and calibrate: self.calibrate() @@ -292,7 +303,8 @@ class SO101FollowerT(Robot): @property def is_calibrated(self) -> bool: - return self.bus.is_calibrated + # Check if calibration file exists and is loaded + return self.calibration_fpath.is_file() and bool(self.calibration) def calibrate(self) -> None: logger.info(f"\nRunning calibration of {self}") @@ -314,12 +326,14 @@ class SO101FollowerT(Robot): self.calibration[motor] = MotorCalibration( id=m.id, drive_mode=0, - homing_offset=homing_offsets[motor], - range_min=range_mins[motor], - range_max=range_maxes[motor], + homing_offset=int(homing_offsets[motor]), + range_min=int(range_mins[motor]), + range_max=int(range_maxes[motor]), ) - self.bus.write_calibration(self.calibration) + # Update the bus calibration with the new values + self.bus.calibration = self.calibration + # Save calibration to file only self._save_calibration() print("Calibration saved to", self.calibration_fpath) @@ -330,7 +344,6 @@ class SO101FollowerT(Robot): self.bus.write("Operating_Mode", motor, 2, num_retry=2) # Set to current mode self.bus.write("Torque_Limit", motor, 1000, num_retry=2) # 100% self.bus.write("Max_Torque_Limit", motor, 1000, num_retry=2) # 100% - self.bus.write("Protection_Current", motor, 1000, num_retry=2) def setup_motors(self) -> None: for motor in reversed(self.bus.motors): @@ -409,6 +422,10 @@ class SO101FollowerT(Robot): max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) counts = {} for joint, τ in tau_cmd_nm.items(): + # Set wrist_flex commands to 0 + if joint == "wrist_flex": + counts[joint] = 0 + continue cnt = τ * self.torque_sign[joint] * inv_coef # flip SIGN cnt = max(-max_cnt, min(max_cnt, cnt)) counts[joint] = int(round(cnt))