fix: current impl

This commit is contained in:
Pepijn
2025-07-04 13:20:27 +02:00
parent ab53de989a
commit d985f4b1db
5 changed files with 373 additions and 172 deletions

View File

@@ -1,138 +1,223 @@
import math
import sys
import time
from concurrent.futures import ThreadPoolExecutor
## Pseudo code for Impedance control and 4 channel bilateral tele-op
# 1. Read the leader and follower
# 2. Compute the follower's impedance torque
# 3. Compute the leader's torque
# 4. Set the leader and follower's torque
# # Read leader
# θ_, ω_, F_ = leader.read_state()
# # Read the follower
# θ_f, ω_f, F_f = follower.read_state()
# # Compute the followers impedance torque
# τ_f = (
# Kp*(θ_ - θ_f) # position coupling
# + Kd*(ω_ - ω_f) # velocity coupling
# + K_ff * F_ # force feed-forward from the leader
# )
# follower.set_torque(τ_f)
# # Compute the leaders torque
# τ_ = (
# K_mp*(θ_f - θ_) # motion-feedback
# + K_md*(ω_f - ω_) # velocity-feedback
# - K_mf * F_f # force-feedback (negative)
# )
# leader.set_torque(τ_)
# Other details mentioned by Bi-ACT paper:
# The controller design adopted a control of position and force
# for each axis, as shown in Fig. 3. Angle information was
# obtained from encoders, and angular velocity was calculated
# by differentiating this information. The disturbance torque τdis
# was calculated using a disturbance observer (DOB) [29], and
# the torque response value τres was estimated using a force
# reaction observer (RFOB) [30].
from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
import numpy as np
F_C = 0.2 # Nm, coulomb (static)
F_V = 0.05 # Nm·s rad⁻¹, residual viscous
Kt = 1.4043083213990768 # Nm/A
AMP_PER_LSB = 0.0065 # 6.5 mA/LSB
MAX_LSB = 2047
Kp, Kd, K_local = 15, 0.5, 0.01
DT = 0.002 # 500 Hz
MOTORS = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll"] # "gripper"
TICKS_TO_RAD = 2.0 * math.pi / 4096.0 # Feetech HLS-3625: 4096 ticks = 1 rev
from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
from lerobot.utils.robot_utils import busy_wait
STEP = 0
SCALE_TRANSIENT = 0.2
SCALE_NORMAL = 1.0
FRQ = 300
PRINT_HZ = 10
ESC_CLR_EOL = "\x1b[K"
CURSOR_UP = "\x1b[F"
def friction(omega: float) -> float:
return F_C * math.tanh(omega / 0.05) + F_V * omega
def vec(obs, key_suffix, joints):
"""dict → 1-D numpy array (selected joints)"""
return np.array([obs[f"{m}{key_suffix}"] for m in joints])
def nm_to_lsb(tau: float) -> int:
return int(max(-MAX_LSB, min(MAX_LSB, round((tau / Kt) / AMP_PER_LSB))))
def to_cmd(tau_vec, joints):
"""1-D array → {'joint.effort': value}"""
return {f"{m}.effort": v for m, v in zip(joints, tau_vec, strict=False)}
def ticks_to_rad(ticks: int) -> float:
return ticks * TICKS_TO_RAD
robot_config = SO101FollowerTConfig(
follower_cfg = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760431551",
id="my_awesome_follower_arm",
)
teleop_config = SO101FollowerTConfig(
leader_cfg = SO101FollowerTConfig(
port="/dev/tty.usbmodem58760428721",
id="my_awesome_leader_arm",
)
robot = SO101FollowerT(robot_config)
follower = SO101FollowerT(follower_cfg)
leader = SO101FollowerT(leader_cfg)
follower.connect()
leader.connect()
teleop_device = SO101FollowerT(teleop_config)
exec_f = ThreadPoolExecutor(max_workers=1)
exec_l = ThreadPoolExecutor(max_workers=1)
robot.connect()
teleop_device.connect()
print("Starting bilateral tele-op")
prev_l = dict.fromkeys(MOTORS, 0.0)
prev_f = dict.fromkeys(MOTORS, 0.0)
last_print = time.time()
loops = 0
print("Starting 4-channel bilateral teleoperation")
first_print = True
loop_count = 0
tic_prev = time.perf_counter()
while True:
STEP += 1
# first 1000 steps at 20%, then full effort
scale = SCALE_TRANSIENT if STEP <= 1000 else SCALE_NORMAL
tic = time.perf_counter()
obs_l = robot.get_observation()
obs_f = teleop_device.get_observation()
tgt_l, tgt_f = {}, {}
for j in MOTORS:
ticks_l = obs_l[f"{j}.pos"]
ticks_f = obs_f[f"{j}.pos"]
fut_l = exec_l.submit(leader.get_observation)
fut_f = exec_f.submit(follower.get_observation)
obs_l, obs_f = fut_l.result(), fut_f.result()
th_l = ticks_to_rad(ticks_l) # → radians
th_f = ticks_to_rad(ticks_f)
dt = tic - tic_prev
tic_prev = tic
if dt <= 0.0:
dt = 1e-3 # avoid div-by-zero on very first pass
dth_l = (th_l - prev_l[j]) / DT
dth_f = (th_f - prev_f[j]) / DT
# Simplified model-based bilateral control
tau_cmd_f, tau_cmd_l = [], []
debug_info_f, debug_info_l = {}, {}
prev_l[j], prev_f[j] = th_l, th_f
# Collect data for all motors
pos_f = {j: obs_f[f"{j}.pos"] for j in follower.bus.motors}
vel_f = {j: obs_f[f"{j}.vel"] for j in follower.bus.motors}
acc_f = {j: obs_f[f"{j}.acc"] for j in follower.bus.motors}
tau_meas_f = {j: obs_f[f"{j}.tau_meas"] for j in follower.bus.motors}
tau_l = -(Kp * (th_f - th_l) + Kd * (dth_f - dth_l) - K_local * dth_l) - friction(dth_l)
tau_f = -(Kp * (th_l - th_f) + Kd * (dth_l - dth_f) - K_local * dth_f) - friction(dth_f)
pos_l = {j: obs_l[f"{j}.pos"] for j in leader.bus.motors}
vel_l = {j: obs_l[f"{j}.vel"] for j in leader.bus.motors}
acc_l = {j: obs_l[f"{j}.acc"] for j in leader.bus.motors}
tau_meas_l = {j: obs_l[f"{j}.tau_meas"] for j in leader.bus.motors}
tau_l *= scale
tau_f *= scale
# Compute reaction torques using model-based approach
tau_reaction_f = follower._compute_model_based_disturbance(pos_f, vel_f, acc_f, tau_meas_f)
tau_reaction_l = leader._compute_model_based_disturbance(pos_l, vel_l, acc_l, tau_meas_l)
tgt_l[f"{j}.effort"] = nm_to_lsb(tau_l)
tgt_f[f"{j}.effort"] = nm_to_lsb(tau_f)
# Get gravity for feed-forward compensation
grav_f = follower._gravity_from_q(pos_f)
grav_l = leader._gravity_from_q(pos_l)
robot.send_action(tgt_l)
teleop_device.send_action(tgt_f)
# Get inertia torques
inertia_f = follower._inertia_from_q_dq(pos_f, vel_f, acc_f)
inertia_l = leader._inertia_from_q_dq(pos_l, vel_l, acc_l)
leader_str = " | ".join(f"{j}:{tgt_l[f'{j}.effort']:+5d}" for j in MOTORS)
follower_str = " | ".join(f"{j}:{tgt_f[f'{j}.effort']:+5d}" for j in MOTORS)
print(f"Leader: {leader_str}")
print(f"Follower: {follower_str}")
Kp = 8.0 # Position gain
Kd = 0.1 # Velocity gain
Kf = 0.1 # Force reflection gain
loops += 1
elapsed = time.perf_counter() - tic
to_sleep = DT - elapsed
if to_sleep > 0:
time.sleep(to_sleep)
for j in follower.bus.motors:
# Bilateral control references (eqs. 5-7)
theta_cmd_f, omega_cmd_f = pos_l[j], vel_l[j] # Follower follows leader
theta_cmd_l, omega_cmd_l = pos_f[j], vel_f[j] # Leader follows follower
now = time.time()
if now - last_print >= 1.0:
print(f"{loops / (now - last_print):6.1f} Hz control loop") # e.g. "500.2 Hz"
loops = 0
last_print = now
tau_cmd_f_ref = -tau_reaction_l[j] # Reflect leader reaction torque
tau_cmd_l_ref = -tau_reaction_f[j] # Reflect follower reaction torque
# Follower control
tau_pos_f = Kp * (theta_cmd_f - pos_f[j])
tau_vel_f = Kd * (omega_cmd_f - vel_f[j])
tau_force_f = Kf * (tau_cmd_f_ref - tau_reaction_f[j])
tau_ref_f = tau_pos_f + tau_vel_f + tau_force_f + grav_f[j]
# Leader control
tau_pos_l = Kp * (theta_cmd_l - pos_l[j])
tau_vel_l = Kd * (omega_cmd_l - vel_l[j])
tau_force_l = Kf * (tau_cmd_l_ref - tau_reaction_l[j])
tau_ref_l = tau_pos_l + tau_vel_l + tau_force_l + grav_l[j]
tau_cmd_f.append(tau_ref_f)
tau_cmd_l.append(tau_ref_l)
# Store debug info
debug_info_f[j] = {
"τ_measured": tau_meas_f[j],
"τ_pos": tau_pos_f,
"τ_vel": tau_vel_f,
"τ_force": tau_force_f,
"τ_gravity": grav_f[j],
"τ_inertia": inertia_f[j],
"τ_reaction": tau_reaction_f[j],
"τ_ref": tau_ref_f,
"θ_err": theta_cmd_f - pos_f[j],
"ω_err": omega_cmd_f - vel_f[j],
"τ_err": tau_cmd_f_ref - tau_reaction_f[j],
}
debug_info_l[j] = {
"τ_measured": tau_meas_l[j],
"τ_pos": tau_pos_l,
"τ_vel": tau_vel_l,
"τ_force": tau_force_l,
"τ_gravity": grav_l[j],
"τ_inertia": inertia_l[j],
"τ_reaction": tau_reaction_l[j],
"τ_ref": tau_ref_l,
"θ_err": theta_cmd_l - pos_l[j],
"ω_err": omega_cmd_l - vel_l[j],
"τ_err": tau_cmd_l_ref - tau_reaction_l[j],
}
# Send torques to both arms
follower.send_action(to_cmd(np.array(tau_cmd_f), list(follower.bus.motors.keys())))
leader.send_action(to_cmd(np.array(tau_cmd_l), list(leader.bus.motors.keys())))
# Console diagnostics (10 Hz)
loop_count += 1
if loop_count % (FRQ // PRINT_HZ) == 0:
hz = 1.0 / dt
# Detailed torque analysis mode
lines = [f"Loop {hz:6.1f} Hz Δt {dt * 1e3:5.2f} ms"]
lines.append("=" * 100)
lines.append(
f"{'Joint':<13}{'L':>8}{'F':>8}{'Grav':>6}{'Inert':>6}{'Pos':>6}{'Vel':>6}{'Force':>6}{'React':>6}{'Meas':>6}{'Cmd':>6}"
)
lines.append(
f"{'':13}{'(deg)':>8}{'(deg)':>8}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}{'(Nm)':>6}"
)
lines.append("-" * 100)
for i, j in enumerate(follower.bus.motors):
debug_f = debug_info_f[j]
debug_l = debug_info_l[j]
lines.append(
f"{j:<13s}"
f"{math.degrees(pos_l[j]):+8.1f}"
f"{math.degrees(pos_f[j]):+8.1f}"
f"{debug_f['τ_gravity']:+6.2f}"
f"{debug_f['τ_inertia']:+6.2f}"
f"{debug_f['τ_pos']:+6.2f}"
f"{debug_f['τ_vel']:+6.2f}"
f"{debug_f['τ_force']:+6.2f}"
f"{debug_f['τ_reaction']:+6.2f}"
f"{debug_f['τ_measured']:+6.2f}"
f"{tau_cmd_f[i]:+6.2f}"
)
lines.append("-" * 100)
lines.append("Model-Based Analysis:")
lines.append(
f"{'Joint':<13} {'Gravity':<8} {'Inertia':<8} {'Reaction':<8} {'Errors':<12} {'Balance':<8}"
)
for j in follower.bus.motors:
debug_f = debug_info_f[j]
g_dir = "" if debug_f["τ_gravity"] > 0 else "" if debug_f["τ_gravity"] < 0 else "≈0"
i_dir = "+" if debug_f["τ_inertia"] > 0.05 else "-" if debug_f["τ_inertia"] < -0.05 else "≈0"
r_dir = "+" if debug_f["τ_reaction"] > 0.05 else "-" if debug_f["τ_reaction"] < -0.05 else "≈0"
pos_err_sign = "+" if debug_f["θ_err"] > 0 else "-" if debug_f["θ_err"] < 0 else "0"
vel_err_sign = "+" if debug_f["ω_err"] > 0 else "-" if debug_f["ω_err"] < 0 else "0"
# Check balance: τ_ref should equal sum of components
expected_sum = debug_f["τ_pos"] + debug_f["τ_vel"] + debug_f["τ_force"] + debug_f["τ_gravity"]
balance_check = "" if abs(expected_sum - debug_f["τ_ref"]) < 0.01 else ""
lines.append(
f"{j:<13s}"
f"{g_dir:<8s}"
f"{i_dir:<8s}"
f"{r_dir:<8s}"
f"θ:{pos_err_sign}ω:{vel_err_sign} "
f"{balance_check}"
)
lines.append("-" * 100)
lines.append(
f"Model-Based Control: React = τ_meas - τ_grav - τ_inert | Gains: Kp={Kp} Kd={Kd} Kf={Kf}"
)
block = "\n".join(lines)
if first_print:
sys.stdout.write(block + "\n")
first_print = False
else:
sys.stdout.write(CURSOR_UP * len(lines) + ESC_CLR_EOL + block + "\n")
sys.stdout.flush()
busy_wait(max(0.0, 1.0 / FRQ - (time.perf_counter() - tic)))

View File

@@ -4,8 +4,8 @@ import time
import numpy as np
import pinocchio as pin
from lerobot.common.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.common.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
from lerobot.robots.so101_follower_torque.config_so101_follower_t import SO101FollowerTConfig
from lerobot.robots.so101_follower_torque.so101_follower_t import SO101FollowerT
MOTORS = [
"shoulder_pan",
@@ -47,19 +47,15 @@ def tau_to_action(tau):
return {f"{m}.effort": float(tau[IDX[m]]) for m in MOTORS}
pin_robot = pin.RobotWrapper.BuildFromURDF(
"lerobot/SO101/so101_new_calib.urdf",
"lerobot/SO101",
)
pin_robot.data = pin_robot.model.createData()
pin_robot.initViewer(open=True)
pin_robot.loadViewerModel()
pin_robot.display(pin_robot.q0)
cfg = SO101FollowerTConfig(port="/dev/tty.usbmodem58760431551", id="follower_torque6")
real = SO101FollowerT(cfg)
real.connect()
real.pin_robot.data = real.pin_robot.model.createData()
real.pin_robot.initViewer(open=True)
real.pin_robot.loadViewerModel()
real.pin_robot.display(real.pin_robot.q0)
ESC_CLR_EOL = "\x1b[K"
print("Running gravity compensation")
@@ -68,7 +64,7 @@ try:
obs = real.get_observation() # positions in rad
q = obs_to_q(obs) # dict to vector
tau = pin.computeGeneralizedGravity(pin_robot.model, pin_robot.data, q) # τ in [Nm]
tau = pin.computeGeneralizedGravity(real.pin_robot.model, real.pin_robot.data, q) # τ in [Nm]
# build compact single-line status
status = " | ".join(
@@ -82,7 +78,7 @@ try:
real.send_action(tau_to_action(tau)) # apply τ
pin_robot.display(q)
real.pin_robot.display(q)
time.sleep(0.002) # Run at 500 Hz
except KeyboardInterrupt:
print("Stopping")

View File

@@ -278,9 +278,10 @@ MODEL_ENCODING_TABLE = {
"Homing_Offset": 15,
"Goal_Velocity": 15,
"Present_Velocity": 15,
"Target_Torque": 10,
"Target_Torque": 15,
"Present_Position": 15,
"Goal_Position": 15,
"Present_Current": 15,
},
}

View File

@@ -16,7 +16,7 @@
from dataclasses import dataclass, field
from lerobot.common.cameras import CameraConfig
from lerobot.cameras import CameraConfig
from ..config import RobotConfig

View File

@@ -20,10 +20,13 @@ import time
from functools import cached_property
from typing import Any
from lerobot.common.cameras.utils import make_cameras_from_configs
from lerobot.common.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.common.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.common.motors.feetech import (
import numpy as np
import pinocchio as pin
from lerobot.cameras.utils import make_cameras_from_configs
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
@@ -42,34 +45,11 @@ class SO101FollowerT(Robot):
config_class = SO101FollowerTConfig
name = "so101_follower_t"
_CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
_CURRENT_STEP_A: float = 6.5e-3 # 6.5 mA per register LSB #http://doc.feetech.cn/#/prodinfodownload?srcType=FT-SMS-STS-emanual-229f4476422d4059abfb1cb0
_KT_NM_PER_AMP: float = 0.814 # Torque constant Kt [N·m/A] #https://www.feetechrc.com/811177.html
_MAX_CURRENT_A: float = 3.0 # Safe driver limit for this model
_COUNT_TO_RAD: float = math.radians(0.087) # 1 pos count to rad
def _current_to_torque_nm(self, raw: dict[str, int]) -> dict[str, float]:
"""Convert "Present_Current" register counts (±2047) → torque [Nm].
Values are clamped to ±3A before conversion for protection.
"""
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) # ≈462
coef = self._CURRENT_STEP_A * self._KT_NM_PER_AMP
return {k: max(min(v, max_cnt), -max_cnt) * coef for k, v in raw.items()}
def _torque_nm_to_current(self, torque: dict[str, float]) -> dict[str, int]:
"""Convert torque [Nm] to register counts, clamped to ±3A (2.44Nm)."""
inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP)
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
max_torque = self._MAX_CURRENT_A * self._KT_NM_PER_AMP
return {
k: max(-max_cnt, min(max_cnt, int(round(max(-max_torque, min(max_torque, float(t))) * inv_coef))))
for k, t in torque.items()
}
def _deg_to_rad(self, deg: dict[str, float | int]) -> dict[str, float]:
"""GDegrees to radians."""
return {m: math.radians(float(v)) for m, v in deg.items()}
def __init__(self, config: SO101FollowerTConfig):
super().__init__(config)
self.config = config
@@ -87,9 +67,33 @@ class SO101FollowerT(Robot):
)
self.cameras = make_cameras_from_configs(config.cameras)
self.pin_robot = pin.RobotWrapper.BuildFromURDF(
"src/lerobot/SO101/so101_new_calib.urdf", "src/lerobot/SO101"
)
flip = {
"shoulder_pan": True,
"shoulder_lift": True,
"elbow_flex": True,
"wrist_flex": True,
"wrist_roll": True,
"gripper": True,
}
self.torque_sign = {n: (-1.0 if flip[n] else 1.0) for n in self.bus.motors}
self._prev_pos_rad: dict[str, float] | None = None
self._prev_vel_rad: dict[str, float] | None = None
self._prev_t: float | None = None
@property
def _motors_ft(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
d: dict[str, type] = {}
for m in self.bus.motors:
d[f"{m}.pos"] = float
d[f"{m}.vel"] = float
d[f"{m}.acc"] = float # Add acceleration
d[f"{m}.tau_meas"] = float # Changed from tau_res to tau_meas
return d
@property
def _cameras_ft(self) -> dict[str, tuple]:
@@ -103,15 +107,99 @@ class SO101FollowerT(Robot):
@cached_property
def action_features(self) -> dict[str, type]:
return {
**{f"{m}.pos": float for m in self.bus.motors},
**{f"{m}.effort": int for m in self.bus.motors},
}
return {f"{m}.effort": int for m in self.bus.motors}
@property
def is_connected(self) -> bool:
return self.bus.is_connected and all(cam.is_connected for cam in self.cameras.values())
def _current_to_torque_nm(self, raw: dict[str, Any]) -> dict[str, float]:
"""Convert "Present_Current" register counts (±2047) → torque [Nm].
Values are clamped to ±3A before conversion for protection.
"""
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A)) # ≈ 462
coef = self._CURRENT_STEP_A * self._KT_NM_PER_AMP
return {k: self.torque_sign[k] * max(-max_cnt, min(max_cnt, v)) * coef for k, v in raw.items()}
def _torque_nm_to_current(self, torque: dict[str, float]) -> dict[str, int]:
"""Convert torque [Nm] to register counts, clamped to ±3A (2.44 Nm)."""
inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP)
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
counts = {}
for k, τ in torque.items():
cnt = τ * self.torque_sign[k] * inv_coef
cnt = max(-max_cnt, min(max_cnt, cnt))
counts[k] = int(round(cnt))
return counts
def _deg_to_rad(self, deg: dict[str, float | int]) -> dict[str, float]:
"""GDegrees to radians."""
return {m: math.radians(float(v)) for m, v in deg.items()}
def _gravity_from_q(self, q_rad: dict[str, float]) -> dict[str, float]:
"""
Compute g(q) [N m] for all joints in the robot.
The order of joints in the URDF matches self.bus.motors.
"""
q = np.zeros(self.pin_robot.model.nq)
for i, motor_name in enumerate(self.bus.motors):
q[i] = q_rad[motor_name]
g = pin.computeGeneralizedGravity(self.pin_robot.model, self.pin_robot.data, q)
return {motor_name: float(g[i]) for i, motor_name in enumerate(self.bus.motors)}
def _inertia_from_q_dq(
self, q_rad: dict[str, float], dq_rad: dict[str, float], ddq_rad: dict[str, float]
) -> dict[str, float]:
"""
Compute inertia torques τ_inertia = M(q) * ddq directly from URDF model.
"""
# Convert joint dictionaries to numpy arrays in correct order
q = np.zeros(self.pin_robot.model.nq)
dq = np.zeros(self.pin_robot.model.nv)
ddq = np.zeros(self.pin_robot.model.nv)
for i, motor_name in enumerate(self.bus.motors):
q[i] = q_rad[motor_name]
dq[i] = dq_rad[motor_name]
ddq[i] = ddq_rad[motor_name]
# Compute mass matrix M(q)
mass_matrix = pin.crba(self.pin_robot.model, self.pin_robot.data, q)
# Compute inertia torques: τ_inertia = M(q) * ddq
tau_inertia = mass_matrix @ ddq
return {motor_name: float(tau_inertia[i]) for i, motor_name in enumerate(self.bus.motors)}
def _compute_model_based_disturbance(
self,
q_rad: dict[str, float],
dq_rad: dict[str, float],
ddq_rad: dict[str, float],
tau_measured: dict[str, float],
) -> dict[str, float]:
"""
Compute disturbance torques using direct model-based approach:
τ_disturbance = τ_measured - τ_gravity - τ_inertia
This replaces the DOB and gives cleaner, delay-free disturbance estimation.
"""
# Get gravity compensation
tau_gravity = self._gravity_from_q(q_rad)
# Get inertia compensation
tau_inertia = self._inertia_from_q_dq(q_rad, dq_rad, ddq_rad)
# Compute disturbance: what's left after removing known dynamics
tau_disturbance = {}
for motor_name in self.bus.motors:
tau_dist = tau_measured[motor_name] - tau_gravity[motor_name] - tau_inertia[motor_name]
tau_disturbance[motor_name] = tau_dist
return tau_disturbance
def connect(self, calibrate: bool = True) -> None:
"""
We assume that at connection time, arm is in a rest position,
@@ -167,15 +255,16 @@ class SO101FollowerT(Robot):
with self.bus.torque_disabled():
self.bus.configure_motors()
for motor in self.bus.motors:
phase = self.bus.read("Phase", motor, normalize=False)
phase = int(self.bus.read("Phase", motor, normalize=False))
if phase & 0x10: # bit-4 set = multi-turn
new_phase = phase & ~0x10
print(f"Switching {motor} to single-turn: 0x{phase:02X} → 0x{new_phase:02X}")
self.bus.write("Phase", motor, new_phase, normalize=False)
self.bus.write("Operating_Mode", motor, 2) # Set to current mode
self.bus.write("Target_Torque", motor, 0)
self.bus.write("Torque_Limit", motor, 1000) # 100%
self.bus.write("Operating_Mode", motor, 2, num_retry=2) # Set to current mode
self.bus.write("Target_Torque", motor, 0, num_retry=2)
self.bus.write("Torque_Limit", motor, 1000, num_retry=2) # 100%
self.bus.write("Return_Delay_Time", motor, 0, num_retry=2)
def setup_motors(self) -> None:
for motor in reversed(self.bus.motors):
@@ -187,20 +276,44 @@ class SO101FollowerT(Robot):
if not self.is_connected:
raise DeviceNotConnectedError(f"{self} is not connected.")
start = time.perf_counter()
t_now = time.perf_counter()
# Positions
pos_deg = self.bus.sync_read("Present_Position", num_retry=10)
# position
pos_deg = self.bus.sync_read("Present_Position", num_retry=5)
pos_rad = self._deg_to_rad(pos_deg)
obs_dict = {f"{m}.pos": r for m, r in pos_rad.items()}
# Currents to torque (Nm)
curr_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=10)
torque_nm = self._current_to_torque_nm({f"{m}.effort": v for m, v in curr_raw.items()})
obs_dict.update(torque_nm)
# velocity and acceleration
if self._prev_pos_rad is None: # first call
vel_rad = dict.fromkeys(pos_rad, 0.0)
acc_rad = dict.fromkeys(pos_rad, 0.0)
dt = 1e-3
else:
dt = t_now - (self._prev_t or 0.0)
dt = max(dt, 1e-4) # Avoid division by zero
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read state: {dt_ms:.1f}ms")
# Compute velocity
vel_rad = {m: (pos_rad[m] - self._prev_pos_rad[m]) / dt for m in pos_rad}
# Compute acceleration
if self._prev_vel_rad is None:
acc_rad = dict.fromkeys(pos_rad, 0.0)
else:
acc_rad = {m: (vel_rad[m] - self._prev_vel_rad[m]) / dt for m in vel_rad}
# Update previous values
self._prev_pos_rad = pos_rad.copy()
self._prev_vel_rad = vel_rad.copy()
self._prev_t = t_now
# measured torque (Nm)
cur_raw = self.bus.sync_read("Present_Current", normalize=False, num_retry=5)
tau_meas = self._current_to_torque_nm(cur_raw)
obs_dict = {}
obs_dict |= {f"{m}.pos": pos_rad[m] for m in self.bus.motors}
obs_dict |= {f"{m}.vel": vel_rad[m] for m in self.bus.motors}
obs_dict |= {f"{m}.acc": acc_rad[m] for m in self.bus.motors}
obs_dict |= {f"{m}.tau_meas": tau_meas[m] for m in self.bus.motors}
# Capture images from cameras
for cam_key, cam in self.cameras.items():
@@ -224,14 +337,20 @@ class SO101FollowerT(Robot):
raise DeviceNotConnectedError(f"{self} is not connected.")
# Extract torque commands
torque_cmd = {k: v for k, v in action.items() if k.endswith(".effort")}
if torque_cmd:
counts = self._torque_nm_to_current(torque_cmd)
# remove the .effort suffix
counts = {k.removesuffix(".effort"): v for k, v in counts.items()}
self.bus.sync_write("Target_Torque", counts, normalize=False, num_retry=2)
tau_cmd_nm = {k.removesuffix(".effort"): float(v) for k, v in action.items() if k.endswith(".effort")}
if not tau_cmd_nm:
return action
# pass back the other keys (.pos) untouched
inv_coef = 1.0 / (self._CURRENT_STEP_A * self._KT_NM_PER_AMP)
max_cnt = int(round(self._MAX_CURRENT_A / self._CURRENT_STEP_A))
counts = {}
for joint, τ in tau_cmd_nm.items():
cnt = τ * self.torque_sign[joint] * inv_coef # flip SIGN
cnt = max(-max_cnt, min(max_cnt, cnt))
counts[joint] = int(round(cnt))
self.bus.sync_write("Target_Torque", counts, normalize=False, num_retry=2)
self._last_cmd_nm = tau_cmd_nm
return action
def disconnect(self):