From d985f4b1db9830ffe904790b7b875b87a5a64ef5 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Fri, 4 Jul 2025 13:20:27 +0200 Subject: [PATCH] fix: current impl --- src/lerobot/bi_teleoperate.py | 291 +++++++++++------- src/lerobot/grav_comp.py | 22 +- src/lerobot/motors/feetech/tables.py | 3 +- .../config_so101_follower_t.py | 2 +- .../so101_follower_torque/so101_follower_t.py | 227 ++++++++++---- 5 files changed, 373 insertions(+), 172 deletions(-) diff --git a/src/lerobot/bi_teleoperate.py b/src/lerobot/bi_teleoperate.py index 620791309..a1cbf99ca 100644 --- a/src/lerobot/bi_teleoperate.py +++ b/src/lerobot/bi_teleoperate.py @@ -1,138 +1,223 @@ import math +import sys import time +from concurrent.futures import ThreadPoolExecutor -## Pseudo code for Impedance control and 4 channel bilateral tele-op -# 1. Read the leader and follower -# 2. Compute the follower's impedance torque -# 3. Compute the leader's torque -# 4. Set the leader and follower's torque -# # Read leader -# θ_ℓ, ω_ℓ, F_ℓ = leader.read_state() -# # Read the follower -# θ_f, ω_f, F_f = follower.read_state() -# # Compute the follower’s impedance torque -# τ_f = ( -# Kp*(θ_ℓ - θ_f) # position coupling -# + Kd*(ω_ℓ - ω_f) # velocity coupling -# + K_ff * F_ℓ # force feed-forward from the leader -# ) -# follower.set_torque(τ_f) -# # Compute the leader’s torque -# τ_ℓ = ( -# K_mp*(θ_f - θ_ℓ) # motion-feedback -# + K_md*(ω_f - ω_ℓ) # velocity-feedback -# - K_mf * F_f # force-feedback (negative) -# ) -# leader.set_torque(τ_ℓ) -# Other details mentioned by Bi-ACT paper: -# The controller design adopted a control of position and force -# for each axis, as shown in Fig. 3. Angle information was -# obtained from encoders, and angular velocity was calculated -# by differentiating this information. The disturbance torque τdis -# was calculated using a disturbance observer (DOB) [29], and -# the torque response value τres was estimated using a force -# reaction observer (RFOB) [30]. -from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig -from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT +import numpy as np -F_C = 0.2 # Nm, coulomb (static) -F_V = 0.05 # Nm·s rad⁻¹, residual viscous -Kt = 1.4043083213990768 # Nm/A -AMP_PER_LSB = 0.0065 # 6.5 mA/LSB -MAX_LSB = 2047 -Kp, Kd, K_local = 15, 0.5, 0.01 -DT = 0.002 # 500 Hz -MOTORS = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"] # "gripper" -TICKS_TO_RAD = 2.0 * math.pi / 4096.0 # Feetech HLS-3625: 4096 ticks = 1 rev +from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig +from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT +from lerobot.utils.robot_utils import busy_wait -STEP = 0 -SCALE_TRANSIENT = 0.2 -SCALE_NORMAL = 1.0 +FRQ = 300 +PRINT_HZ = 10 +ESC_CLR_EOL = "\x1b[K" +CURSOR_UP = "\x1b[F" -def friction(omega: float) -> float: - return F_C * math.tanh(omega / 0.05) + F_V * omega +def vec(obs, key_suffix, joints): + """dict → 1-D numpy array (selected joints)""" + return np.array([obs[f"{m}{key_suffix}"] for m in joints]) -def nm_to_lsb(tau: float) -> int: - return int(max(-MAX_LSB, min(MAX_LSB, round((tau / Kt) / AMP_PER_LSB)))) +def to_cmd(tau_vec, joints): + """1-D array → {'joint.effort': value}""" + return {f"{m}.effort": v for m, v in zip(joints, tau_vec, strict=False)} -def ticks_to_rad(ticks: int) -> float: - return ticks * TICKS_TO_RAD - - -robot_config = SO101FollowerTConfig( +follower_cfg = SO101FollowerTConfig( port="/dev/tty.usbmodem58760431551", id="my_awesome_follower_arm", ) -teleop_config = SO101FollowerTConfig( +leader_cfg = SO101FollowerTConfig( port="/dev/tty.usbmodem58760428721", id="my_awesome_leader_arm", ) -robot = SO101FollowerT(robot_config) +follower = SO101FollowerT(follower_cfg) +leader = SO101FollowerT(leader_cfg) +follower.connect() +leader.connect() -teleop_device = SO101FollowerT(teleop_config) +exec_f = ThreadPoolExecutor(max_workers=1) +exec_l = ThreadPoolExecutor(max_workers=1) -robot.connect() -teleop_device.connect() - -print("Starting bilateral tele-op") -prev_l = dict.fromkeys(MOTORS, 0.0) -prev_f = dict.fromkeys(MOTORS, 0.0) - -last_print = time.time() -loops = 0 +print("Starting 4-channel bilateral teleoperation") +first_print = True +loop_count = 0 +tic_prev = time.perf_counter() while True: - STEP += 1 - # first 1000 steps at 20%, then full effort - scale = SCALE_TRANSIENT if STEP <= 1000 else SCALE_NORMAL - tic = time.perf_counter() - obs_l = robot.get_observation() - obs_f = teleop_device.get_observation() - tgt_l, tgt_f = {}, {} - for j in MOTORS: - ticks_l = obs_l[f"{j}.pos"] - ticks_f = obs_f[f"{j}.pos"] + fut_l = exec_l.submit(leader.get_observation) + fut_f = exec_f.submit(follower.get_observation) + obs_l, obs_f = fut_l.result(), fut_f.result() - th_l = ticks_to_rad(ticks_l) # → radians - th_f = ticks_to_rad(ticks_f) + dt = tic - tic_prev + tic_prev = tic + if dt <= 0.0: + dt = 1e-3 # avoid div-by-zero on very first pass - dth_l = (th_l - prev_l[j]) / DT - dth_f = (th_f - prev_f[j]) / DT + # Simplified model-based bilateral control + tau_cmd_f, tau_cmd_l = [], [] + debug_info_f, debug_info_l = {}, {} - prev_l[j], prev_f[j] = th_l, th_f + # Collect data for all motors + pos_f = {j: obs_f[f"{j}.pos"] for j in follower.bus.motors} + vel_f = {j: obs_f[f"{j}.vel"] for j in follower.bus.motors} + acc_f = {j: obs_f[f"{j}.acc"] for j in follower.bus.motors} + tau_meas_f = {j: obs_f[f"{j}.tau_meas"] for j in follower.bus.motors} - tau_l = -(Kp * (th_f - th_l) + Kd * (dth_f - dth_l) - K_local * dth_l) - friction(dth_l) - tau_f = -(Kp * (th_l - th_f) + Kd * (dth_l - dth_f) - K_local * dth_f) - friction(dth_f) + pos_l = {j: obs_l[f"{j}.pos"] for j in leader.bus.motors} + vel_l = {j: obs_l[f"{j}.vel"] for j in leader.bus.motors} + acc_l = {j: obs_l[f"{j}.acc"] for j in leader.bus.motors} + tau_meas_l = {j: obs_l[f"{j}.tau_meas"] for j in leader.bus.motors} - tau_l *= scale - tau_f *= scale + # Compute reaction torques using model-based approach + tau_reaction_f = follower._compute_model_based_disturbance(pos_f, vel_f, acc_f, tau_meas_f) + tau_reaction_l = leader._compute_model_based_disturbance(pos_l, vel_l, acc_l, tau_meas_l) - tgt_l[f"{j}.effort"] = nm_to_lsb(tau_l) - tgt_f[f"{j}.effort"] = nm_to_lsb(tau_f) + # Get gravity for feed-forward compensation + grav_f = follower._gravity_from_q(pos_f) + grav_l = leader._gravity_from_q(pos_l) - robot.send_action(tgt_l) - teleop_device.send_action(tgt_f) + # Get inertia torques + inertia_f = follower._inertia_from_q_dq(pos_f, vel_f, acc_f) + inertia_l = leader._inertia_from_q_dq(pos_l, vel_l, acc_l) - leader_str = " | ".join(f"{j}:{tgt_l[f'{j}.effort']:+5d}" for j in MOTORS) - follower_str = " | ".join(f"{j}:{tgt_f[f'{j}.effort']:+5d}" for j in MOTORS) - print(f"Leader: {leader_str}") - print(f"Follower: {follower_str}") + Kp = 8.0 # Position gain + Kd = 0.1 # Velocity gain + Kf = 0.1 # Force reflection gain - loops += 1 - elapsed = time.perf_counter() - tic - to_sleep = DT - elapsed - if to_sleep > 0: - time.sleep(to_sleep) + for j in follower.bus.motors: + # Bilateral control references (eqs. 5-7) + theta_cmd_f, omega_cmd_f = pos_l[j], vel_l[j] # Follower follows leader + theta_cmd_l, omega_cmd_l = pos_f[j], vel_f[j] # Leader follows follower - now = time.time() - if now - last_print >= 1.0: - print(f"{loops / (now - last_print):6.1f} Hz control loop") # e.g. "500.2 Hz" - loops = 0 - last_print = now + tau_cmd_f_ref = -tau_reaction_l[j] # Reflect leader reaction torque + tau_cmd_l_ref = -tau_reaction_f[j] # Reflect follower reaction torque + + # Follower control + tau_pos_f = Kp * (theta_cmd_f - pos_f[j]) + tau_vel_f = Kd * (omega_cmd_f - vel_f[j]) + tau_force_f = Kf * (tau_cmd_f_ref - tau_reaction_f[j]) + tau_ref_f = tau_pos_f + tau_vel_f + tau_force_f + grav_f[j] + + # Leader control + tau_pos_l = Kp * (theta_cmd_l - pos_l[j]) + tau_vel_l = Kd * (omega_cmd_l - vel_l[j]) + tau_force_l = Kf * (tau_cmd_l_ref - tau_reaction_l[j]) + tau_ref_l = tau_pos_l + tau_vel_l + tau_force_l + grav_l[j] + + tau_cmd_f.append(tau_ref_f) + tau_cmd_l.append(tau_ref_l) + + # Store debug info + debug_info_f[j] = { + "τ_measured": tau_meas_f[j], + "τ_pos": tau_pos_f, + "τ_vel": tau_vel_f, + "τ_force": tau_force_f, + "τ_gravity": grav_f[j], + "τ_inertia": inertia_f[j], + "τ_reaction": tau_reaction_f[j], + "τ_ref": tau_ref_f, + "θ_err": theta_cmd_f - pos_f[j], + "ω_err": omega_cmd_f - vel_f[j], + "τ_err": tau_cmd_f_ref - tau_reaction_f[j], + } + debug_info_l[j] = { + "τ_measured": tau_meas_l[j], + "τ_pos": tau_pos_l, + "τ_vel": tau_vel_l, + "τ_force": tau_force_l, + "τ_gravity": grav_l[j], + "τ_inertia": inertia_l[j], + "τ_reaction": tau_reaction_l[j], + "τ_ref": tau_ref_l, + "θ_err": theta_cmd_l - pos_l[j], + "ω_err": omega_cmd_l - vel_l[j], + "τ_err": tau_cmd_l_ref - tau_reaction_l[j], + } + + # Send torques to both arms + follower.send_action(to_cmd(np.array(tau_cmd_f), list(follower.bus.motors.keys()))) + leader.send_action(to_cmd(np.array(tau_cmd_l), list(leader.bus.motors.keys()))) + + # Console diagnostics (10 Hz) + loop_count += 1 + if loop_count % (FRQ // PRINT_HZ) == 0: + hz = 1.0 / dt + + # Detailed torque analysis mode + lines = [f"Loop {hz:6.1f} Hz Δt {dt * 1e3:5.2f} ms"] + lines.append("=" * 100) + lines.append( + f"{'Joint':<13}{'L':>8}{'F':>8}{'Grav':>6}{'Inert':>6}{'Pos':>6}{'Vel':>6}{'Force':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}" + ) + lines.append( + f"{'':13}{'(deg)':>8}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}" + ) + lines.append("-" * 100) + + for i, j in enumerate(follower.bus.motors): + debug_f = debug_info_f[j] + debug_l = debug_info_l[j] + + lines.append( + f"{j:<13s}" + f"{math.degrees(pos_l[j]):+8.1f}" + f"{math.degrees(pos_f[j]):+8.1f}" + f"{debug_f['τ_gravity']:+6.2f}" + f"{debug_f['τ_inertia']:+6.2f}" + f"{debug_f['τ_pos']:+6.2f}" + f"{debug_f['τ_vel']:+6.2f}" + f"{debug_f['τ_force']:+6.2f}" + f"{debug_f['τ_reaction']:+6.2f}" + f"{debug_f['τ_measured']:+6.2f}" + f"{tau_cmd_f[i]:+6.2f}" + ) + + lines.append("-" * 100) + lines.append("Model-Based Analysis:") + lines.append( + f"{'Joint':<13} {'Gravity':<8} {'Inertia':<8} {'Reaction':<8} {'Errors':<12} {'Balance':<8}" + ) + for j in follower.bus.motors: + debug_f = debug_info_f[j] + g_dir = "↑" if debug_f["τ_gravity"] > 0 else "↓" if debug_f["τ_gravity"] < 0 else "≈0" + i_dir = "+" if debug_f["τ_inertia"] > 0.05 else "-" if debug_f["τ_inertia"] < -0.05 else "≈0" + r_dir = "+" if debug_f["τ_reaction"] > 0.05 else "-" if debug_f["τ_reaction"] < -0.05 else "≈0" + + pos_err_sign = "+" if debug_f["θ_err"] > 0 else "-" if debug_f["θ_err"] < 0 else "0" + vel_err_sign = "+" if debug_f["ω_err"] > 0 else "-" if debug_f["ω_err"] < 0 else "0" + + # Check balance: τ_ref should equal sum of components + expected_sum = debug_f["τ_pos"] + debug_f["τ_vel"] + debug_f["τ_force"] + debug_f["τ_gravity"] + balance_check = "✓" if abs(expected_sum - debug_f["τ_ref"]) < 0.01 else "✗" + + lines.append( + f"{j:<13s}" + f"{g_dir:<8s}" + f"{i_dir:<8s}" + f"{r_dir:<8s}" + f"θ:{pos_err_sign}ω:{vel_err_sign} " + f"{balance_check}" + ) + + lines.append("-" * 100) + lines.append( + f"Model-Based Control: React = τ_meas - τ_grav - τ_inert | Gains: Kp={Kp} Kd={Kd} Kf={Kf}" + ) + + block = "\n".join(lines) + if first_print: + sys.stdout.write(block + "\n") + first_print = False + else: + sys.stdout.write(CURSOR_UP * len(lines) + ESC_CLR_EOL + block + "\n") + sys.stdout.flush() + + busy_wait(max(0.0, 1.0 / FRQ - (time.perf_counter() - tic))) diff --git a/src/lerobot/grav_comp.py b/src/lerobot/grav_comp.py index bc70eb8b9..fa416598a 100644 --- a/src/lerobot/grav_comp.py +++ b/src/lerobot/grav_comp.py @@ -4,8 +4,8 @@ import time import numpy as np import pinocchio as pin -from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig -from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT +from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig +from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT MOTORS = [ "shoulder_pan", @@ -47,19 +47,15 @@ def tau_to_action(tau): return {f"{m}.effort": float(tau[IDX[m]]) for m in MOTORS} -pin_robot = pin.RobotWrapper.BuildFromURDF( - "lerobot/SO101/so101_new_calib.urdf", - "lerobot/SO101", -) -pin_robot.data = pin_robot.model.createData() -pin_robot.initViewer(open=True) -pin_robot.loadViewerModel() -pin_robot.display(pin_robot.q0) - cfg = SO101FollowerTConfig(port="/dev/tty.usbmodem58760431551", id="follower_torque6") real = SO101FollowerT(cfg) real.connect() +real.pin_robot.data = real.pin_robot.model.createData() +real.pin_robot.initViewer(open=True) +real.pin_robot.loadViewerModel() +real.pin_robot.display(real.pin_robot.q0) + ESC_CLR_EOL = "\x1b[K" print("Running gravity compensation") @@ -68,7 +64,7 @@ try: obs = real.get_observation() # positions in rad q = obs_to_q(obs) # dict to vector - tau = pin.computeGeneralizedGravity(pin_robot.model, pin_robot.data, q) # τ in [Nm] + tau = pin.computeGeneralizedGravity(real.pin_robot.model, real.pin_robot.data, q) # τ in [Nm] # build compact single-line status status = " | ".join( @@ -82,7 +78,7 @@ try: real.send_action(tau_to_action(tau)) # apply τ - pin_robot.display(q) + real.pin_robot.display(q) time.sleep(0.002) # Run at 500 Hz except KeyboardInterrupt: print("Stopping") diff --git a/src/lerobot/motors/feetech/tables.py b/src/lerobot/motors/feetech/tables.py index d711c354a..eec765c57 100644 --- a/src/lerobot/motors/feetech/tables.py +++ b/src/lerobot/motors/feetech/tables.py @@ -278,9 +278,10 @@ MODEL_ENCODING_TABLE = { "Homing_Offset": 15, "Goal_Velocity": 15, "Present_Velocity": 15, - "Target_Torque": 10, + "Target_Torque": 15, "Present_Position": 15, "Goal_Position": 15, + "Present_Current": 15, }, } diff --git a/src/lerobot/robots/so101_follower_torque/config_so101_follower_t.py b/src/lerobot/robots/so101_follower_torque/config_so101_follower_t.py index 7cffa6968..fee5b1749 100644 --- a/src/lerobot/robots/so101_follower_torque/config_so101_follower_t.py +++ b/src/lerobot/robots/so101_follower_torque/config_so101_follower_t.py @@ -16,7 +16,7 @@ from dataclasses import dataclass, field -from lerobot.common.cameras import CameraConfig +from lerobot.cameras import CameraConfig from ..config import RobotConfig diff --git a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py index e9c5ff144..269db0020 100644 --- a/src/lerobot/robots/so101_follower_torque/so101_follower_t.py +++ b/src/lerobot/robots/so101_follower_torque/so101_follower_t.py @@ -20,10 +20,13 @@ import time from functools import cached_property from typing import Any -from lerobot.common.cameras.utils import make_cameras_from_configs -from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError -from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode -from lerobot.common.motors.feetech import ( +import numpy as np +import pinocchio as pin + +from lerobot.cameras.utils import make_cameras_from_configs +from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError +from lerobot.motors import Motor, MotorCalibration, MotorNormMode +from lerobot.motors.feetech import ( FeetechMotorsBus, OperatingMode, ) @@ -42,34 +45,11 @@ class SO101FollowerT(Robot): config_class = SO101FollowerTConfig name = "so101_follower_t" - _CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0 + _CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0 _KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html _MAX_CURRENT_A: float = 3.0 # Safe driver limit for this model - _COUNT_TO_RAD: float = math.radians(0.087) # 1 pos count to rad - def _current_to_torque_nm(self, raw: dict[str, int]) -> dict[str, float]: - """Convert "Present_Current" register counts (±2047) → torque [Nm]. - Values are clamped to ±3A before conversion for protection. - """ - max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) # ≈ 462 - coef = self._CURRENT_STEP_A * self._KT_NM_PER_AMP - return {k: max(min(v, max_cnt), -max_cnt) * coef for k, v in raw.items()} - - def _torque_nm_to_current(self, torque: dict[str, float]) -> dict[str, int]: - """Convert torque [Nm] to register counts, clamped to ±3A (2.44 Nm).""" - inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP) - max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) - max_torque = self._MAX_CURRENT_A * self._KT_NM_PER_AMP - return { - k: max(-max_cnt, min(max_cnt, int(round(max(-max_torque, min(max_torque, float(t))) * inv_coef)))) - for k, t in torque.items() - } - - def _deg_to_rad(self, deg: dict[str, float | int]) -> dict[str, float]: - """GDegrees to radians.""" - return {m: math.radians(float(v)) for m, v in deg.items()} - def __init__(self, config: SO101FollowerTConfig): super().__init__(config) self.config = config @@ -87,9 +67,33 @@ class SO101FollowerT(Robot): ) self.cameras = make_cameras_from_configs(config.cameras) + self.pin_robot = pin.RobotWrapper.BuildFromURDF( + "src/lerobot/SO101/so101_new_calib.urdf", "src/lerobot/SO101" + ) + + flip = { + "shoulder_pan": True, + "shoulder_lift": True, + "elbow_flex": True, + "wrist_flex": True, + "wrist_roll": True, + "gripper": True, + } + self.torque_sign = {n: (-1.0 if flip[n] else 1.0) for n in self.bus.motors} + + self._prev_pos_rad: dict[str, float] | None = None + self._prev_vel_rad: dict[str, float] | None = None + self._prev_t: float | None = None + @property def _motors_ft(self) -> dict[str, type]: - return {f"{motor}.pos": float for motor in self.bus.motors} + d: dict[str, type] = {} + for m in self.bus.motors: + d[f"{m}.pos"] = float + d[f"{m}.vel"] = float + d[f"{m}.acc"] = float # Add acceleration + d[f"{m}.tau_meas"] = float # Changed from tau_res to tau_meas + return d @property def _cameras_ft(self) -> dict[str, tuple]: @@ -103,15 +107,99 @@ class SO101FollowerT(Robot): @cached_property def action_features(self) -> dict[str, type]: - return { - **{f"{m}.pos": float for m in self.bus.motors}, - **{f"{m}.effort": int for m in self.bus.motors}, - } + return {f"{m}.effort": int for m in self.bus.motors} @property def is_connected(self) -> bool: return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values()) + def _current_to_torque_nm(self, raw: dict[str, Any]) -> dict[str, float]: + """Convert "Present_Current" register counts (±2047) → torque [Nm]. + Values are clamped to ±3A before conversion for protection. + """ + max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) # ≈ 462 + coef = self._CURRENT_STEP_A * self._KT_NM_PER_AMP + return {k: self.torque_sign[k] * max(-max_cnt, min(max_cnt, v)) * coef for k, v in raw.items()} + + def _torque_nm_to_current(self, torque: dict[str, float]) -> dict[str, int]: + """Convert torque [Nm] to register counts, clamped to ±3A (2.44 Nm).""" + inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP) + max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) + counts = {} + for k, τ in torque.items(): + cnt = τ * self.torque_sign[k] * inv_coef + cnt = max(-max_cnt, min(max_cnt, cnt)) + counts[k] = int(round(cnt)) + return counts + + def _deg_to_rad(self, deg: dict[str, float | int]) -> dict[str, float]: + """GDegrees to radians.""" + return {m: math.radians(float(v)) for m, v in deg.items()} + + def _gravity_from_q(self, q_rad: dict[str, float]) -> dict[str, float]: + """ + Compute g(q) [N m] for all joints in the robot. + The order of joints in the URDF matches self.bus.motors. + """ + q = np.zeros(self.pin_robot.model.nq) + for i, motor_name in enumerate(self.bus.motors): + q[i] = q_rad[motor_name] + + g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q) + + return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)} + + def _inertia_from_q_dq( + self, q_rad: dict[str, float], dq_rad: dict[str, float], ddq_rad: dict[str, float] + ) -> dict[str, float]: + """ + Compute inertia torques τ_inertia = M(q) * ddq directly from URDF model. + """ + # Convert joint dictionaries to numpy arrays in correct order + q = np.zeros(self.pin_robot.model.nq) + dq = np.zeros(self.pin_robot.model.nv) + ddq = np.zeros(self.pin_robot.model.nv) + + for i, motor_name in enumerate(self.bus.motors): + q[i] = q_rad[motor_name] + dq[i] = dq_rad[motor_name] + ddq[i] = ddq_rad[motor_name] + + # Compute mass matrix M(q) + mass_matrix = pin.crba(self.pin_robot.model, self.pin_robot.data, q) + + # Compute inertia torques: τ_inertia = M(q) * ddq + tau_inertia = mass_matrix @ ddq + + return {motor_name: float(tau_inertia[i]) for i, motor_name in enumerate(self.bus.motors)} + + def _compute_model_based_disturbance( + self, + q_rad: dict[str, float], + dq_rad: dict[str, float], + ddq_rad: dict[str, float], + tau_measured: dict[str, float], + ) -> dict[str, float]: + """ + Compute disturbance torques using direct model-based approach: + τ_disturbance = τ_measured - τ_gravity - τ_inertia + + This replaces the DOB and gives cleaner, delay-free disturbance estimation. + """ + # Get gravity compensation + tau_gravity = self._gravity_from_q(q_rad) + + # Get inertia compensation + tau_inertia = self._inertia_from_q_dq(q_rad, dq_rad, ddq_rad) + + # Compute disturbance: what's left after removing known dynamics + tau_disturbance = {} + for motor_name in self.bus.motors: + tau_dist = tau_measured[motor_name] - tau_gravity[motor_name] - tau_inertia[motor_name] + tau_disturbance[motor_name] = tau_dist + + return tau_disturbance + def connect(self, calibrate: bool = True) -> None: """ We assume that at connection time, arm is in a rest position, @@ -167,15 +255,16 @@ class SO101FollowerT(Robot): with self.bus.torque_disabled(): self.bus.configure_motors() for motor in self.bus.motors: - phase = self.bus.read("Phase", motor, normalize=False) + phase = int(self.bus.read("Phase", motor, normalize=False)) if phase & 0x10: # bit-4 set = multi-turn new_phase = phase & ~0x10 print(f"Switching {motor} to single-turn: 0x{phase:02X} → 0x{new_phase:02X}") self.bus.write("Phase", motor, new_phase, normalize=False) - self.bus.write("Operating_Mode", motor, 2) # Set to current mode - self.bus.write("Target_Torque", motor, 0) - self.bus.write("Torque_Limit", motor, 1000) # 100% + self.bus.write("Operating_Mode", motor, 2, num_retry=2) # Set to current mode + self.bus.write("Target_Torque", motor, 0, num_retry=2) + self.bus.write("Torque_Limit", motor, 1000, num_retry=2) # 100% + self.bus.write("Return_Delay_Time", motor, 0, num_retry=2) def setup_motors(self) -> None: for motor in reversed(self.bus.motors): @@ -187,20 +276,44 @@ class SO101FollowerT(Robot): if not self.is_connected: raise DeviceNotConnectedError(f"{self} is not connected.") - start = time.perf_counter() + t_now = time.perf_counter() - # Positions - pos_deg = self.bus.sync_read("Present_Position", num_retry=10) + # position + pos_deg = self.bus.sync_read("Present_Position", num_retry=5) pos_rad = self._deg_to_rad(pos_deg) - obs_dict = {f"{m}.pos": r for m, r in pos_rad.items()} - # Currents to torque (Nm) - curr_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=10) - torque_nm = self._current_to_torque_nm({f"{m}.effort": v for m, v in curr_raw.items()}) - obs_dict.update(torque_nm) + # velocity and acceleration + if self._prev_pos_rad is None: # first call + vel_rad = dict.fromkeys(pos_rad, 0.0) + acc_rad = dict.fromkeys(pos_rad, 0.0) + dt = 1e-3 + else: + dt = t_now - (self._prev_t or 0.0) + dt = max(dt, 1e-4) # Avoid division by zero - dt_ms = (time.perf_counter() - start) * 1e3 - logger.debug(f"{self} read state: {dt_ms:.1f}ms") + # Compute velocity + vel_rad = {m: (pos_rad[m] - self._prev_pos_rad[m]) / dt for m in pos_rad} + + # Compute acceleration + if self._prev_vel_rad is None: + acc_rad = dict.fromkeys(pos_rad, 0.0) + else: + acc_rad = {m: (vel_rad[m] - self._prev_vel_rad[m]) / dt for m in vel_rad} + + # Update previous values + self._prev_pos_rad = pos_rad.copy() + self._prev_vel_rad = vel_rad.copy() + self._prev_t = t_now + + # measured torque (Nm) + cur_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=5) + tau_meas = self._current_to_torque_nm(cur_raw) + + obs_dict = {} + obs_dict |= {f"{m}.pos": pos_rad[m] for m in self.bus.motors} + obs_dict |= {f"{m}.vel": vel_rad[m] for m in self.bus.motors} + obs_dict |= {f"{m}.acc": acc_rad[m] for m in self.bus.motors} + obs_dict |= {f"{m}.tau_meas": tau_meas[m] for m in self.bus.motors} # Capture images from cameras for cam_key, cam in self.cameras.items(): @@ -224,14 +337,20 @@ class SO101FollowerT(Robot): raise DeviceNotConnectedError(f"{self} is not connected.") # Extract torque commands - torque_cmd = {k: v for k, v in action.items() if k.endswith(".effort")} - if torque_cmd: - counts = self._torque_nm_to_current(torque_cmd) - # remove the .effort suffix - counts = {k.removesuffix(".effort"): v for k, v in counts.items()} - self.bus.sync_write("Target_Torque", counts, normalize=False, num_retry=2) + tau_cmd_nm = {k.removesuffix(".effort"): float(v) for k, v in action.items() if k.endswith(".effort")} + if not tau_cmd_nm: + return action - # pass back the other keys (.pos) untouched + inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP) + max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) + counts = {} + for joint, τ in tau_cmd_nm.items(): + cnt = τ * self.torque_sign[joint] * inv_coef # flip SIGN + cnt = max(-max_cnt, min(max_cnt, cnt)) + counts[joint] = int(round(cnt)) + + self.bus.sync_write("Target_Torque", counts, normalize=False, num_retry=2) + self._last_cmd_nm = tau_cmd_nm return action def disconnect(self):