Compare commits

..

10 Commits

Author SHA1 Message Date
Pepijn
7fcd24427d print update hz 2026-01-09 11:35:51 +01:00
Pepijn
e92d6fa067 fix build frame 2026-01-09 11:31:29 +01:00
Pepijn
bbaadc49c2 do evaluate with rtc 2026-01-09 11:20:20 +01:00
Pepijn
c4953a57c0 debug 2026-01-09 11:08:02 +01:00
Pepijn
d63b83ff40 add debug 2026-01-09 10:21:02 +01:00
Pepijn
c3f4aa2d98 fix build obs 2026-01-09 10:13:09 +01:00
Pepijn
33f84fe0ec with rtc 2026-01-09 09:56:14 +01:00
Pepijn
8e430f323f revert 2026-01-09 09:41:04 +01:00
Pepijn
6d12740c24 try async 2026-01-09 09:36:02 +01:00
Pepijn
9b4d63358a Add code for interpolation, pid tuning, raw action scaling and vel FF 2026-01-09 00:53:37 +01:00
3 changed files with 1498 additions and 5 deletions

View File

@@ -0,0 +1,587 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenArms Policy Evaluation with Interpolation
Evaluates a trained policy with smooth action interpolation:
- Decoupled camera capture (CAMERA_FPS) from robot control (ROBOT_FPS)
- Speed multiplier to execute actions faster than training
- Velocity feedforward for smoother tracking
- Adjustable PID gains
Example usage:
python examples/openarms/evaluate_interpolation.py
"""
import time
from collections import deque
from pathlib import Path
import numpy as np
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.processor import make_default_processors
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.teleoperators.openarms.config_openarms_leader import OpenArmsLeaderConfig
from lerobot.teleoperators.openarms.openarms_leader import OpenArmsLeader
from lerobot.utils.control_utils import init_keyboard_listener, predict_action
from lerobot.utils.utils import log_say, get_safe_torch_device
from lerobot.utils.visualization_utils import init_rerun
# ======================== MODEL & TASK CONFIG ========================
HF_MODEL_ID = "lerobot-data-collection/three-folds-pi0" # TODO: Replace with your trained model
HF_EVAL_DATASET_ID = "lerobot-data-collection/three-folds-pi0_eval_interp" # TODO: Replace
TASK_DESCRIPTION = "three-folds-dataset" # TODO: Replace with your task
# ======================== TIMING CONFIG ========================
CAMERA_FPS = 30 # Camera hardware limit (fixed)
POLICY_FPS = 30 # What the policy was trained with
SPEED_MULTIPLIER = 1.2 # Execute actions faster (1.0 = normal, 1.2 = 20% faster)
ROBOT_FPS = 50 # Robot command rate (higher = smoother interpolation)
# Derived values
EFFECTIVE_POLICY_FPS = int(POLICY_FPS * SPEED_MULTIPLIER) # How fast we consume actions (36Hz at 1.2x)
NUM_EPISODES = 1
EPISODE_TIME_SEC = 300
RESET_TIME_SEC = 60
# ======================== PID TUNING ========================
# Set to None to use robot config defaults
CUSTOM_KP_SCALE = 0.7 # Scale factor for position gain (0.5-1.0, lower = smoother)
CUSTOM_KD_SCALE = 1.3 # Scale factor for damping gain (1.0-2.0, higher = less overshoot)
USE_VELOCITY_FEEDFORWARD = True # Enable velocity feedforward for smoother tracking
# ======================== ROBOT CONFIG ========================
FOLLOWER_LEFT_PORT = "can0"
FOLLOWER_RIGHT_PORT = "can1"
USE_LEADER_FOR_RESETS = True
LEADER_LEFT_PORT = "can2"
LEADER_RIGHT_PORT = "can3"
# Camera config uses CAMERA_FPS (hardware limit)
CAMERA_CONFIG = {
"left_wrist": OpenCVCameraConfig(index_or_path="/dev/video5", width=640, height=480, fps=CAMERA_FPS),
"right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=CAMERA_FPS),
"base": OpenCVCameraConfig(index_or_path="/dev/video3", width=640, height=480, fps=CAMERA_FPS),
}
class ActionInterpolator:
"""Interpolate between policy actions for smoother robot control with velocity estimation."""
def __init__(self, effective_policy_fps: int, robot_fps: int):
self.effective_policy_fps = effective_policy_fps
self.robot_fps = robot_fps
self.substeps_per_policy_step = robot_fps / effective_policy_fps
self.prev_action: dict | None = None
self.curr_action: dict | None = None
self.substep = 0
self.last_interpolated: dict | None = None
def update(self, new_action: dict) -> None:
self.prev_action = self.curr_action
self.curr_action = new_action
self.substep = 0
def get_interpolated_action(self) -> tuple[dict | None, dict | None]:
"""Returns (interpolated_position, estimated_velocity_deg_per_sec)"""
if self.curr_action is None:
return None, None
if self.prev_action is None:
self.last_interpolated = self.curr_action.copy()
return self.curr_action, {k: 0.0 for k in self.curr_action}
t = min(self.substep / self.substeps_per_policy_step, 1.0)
self.substep += 1
interpolated = {}
velocity = {}
dt = 1.0 / self.robot_fps
for key in self.curr_action:
prev = self.prev_action.get(key, self.curr_action[key])
curr = self.curr_action[key]
interpolated[key] = prev * (1 - t) + curr * t
if self.last_interpolated is not None and key in self.last_interpolated:
velocity[key] = (interpolated[key] - self.last_interpolated[key]) / dt
else:
velocity[key] = (curr - prev) * self.effective_policy_fps
self.last_interpolated = interpolated.copy()
return interpolated, velocity
def reset(self):
self.prev_action = None
self.curr_action = None
self.substep = 0
self.last_interpolated = None
class HzTracker:
"""Track and display actual loop frequency."""
def __init__(self, name: str = "Robot", window_size: int = 100, print_interval: float = 2.0):
self.name = name
self.timestamps = deque(maxlen=window_size)
self.last_print_time = 0
self.print_interval = print_interval
def tick(self) -> float | None:
now = time.perf_counter()
self.timestamps.append(now)
if len(self.timestamps) < 2:
return None
hz = (len(self.timestamps) - 1) / (self.timestamps[-1] - self.timestamps[0])
if now - self.last_print_time >= self.print_interval:
print(f"{self.name} Hz: {hz:.1f}")
self.last_print_time = now
return hz
def get_avg_hz(self) -> float | None:
if len(self.timestamps) < 2:
return None
return (len(self.timestamps) - 1) / (self.timestamps[-1] - self.timestamps[0])
def reset(self):
self.timestamps.clear()
self.last_print_time = 0
def interpolated_eval_loop(
robot,
policy,
preprocessor,
postprocessor,
robot_observation_processor,
robot_action_processor,
dataset,
events,
interpolator: ActionInterpolator,
robot_hz_tracker: HzTracker,
camera_fps: int,
effective_policy_fps: int,
robot_fps: int,
control_time_s: float,
task: str,
kp_scale: float | None = None,
kd_scale: float | None = None,
use_velocity_ff: bool = False,
):
"""
Run evaluation with decoupled camera and robot control:
- Camera captures at camera_fps (hardware limit)
- Policy inference runs when new camera frame is available
- Actions are consumed at effective_policy_fps (sped up by SPEED_MULTIPLIER)
- Robot receives interpolated commands at robot_fps (smoothest)
"""
from lerobot.scripts.lerobot_record import build_dataset_frame, make_robot_action
from lerobot.utils.visualization_utils import log_rerun_data
camera_dt = 1.0 / camera_fps
policy_dt = 1.0 / effective_policy_fps
robot_dt = 1.0 / robot_fps
interpolator.reset()
robot_hz_tracker.reset()
policy.reset()
# Build custom gains if scaling is enabled
custom_kp = None
custom_kd = None
if kp_scale is not None or kd_scale is not None:
custom_kp = {}
custom_kd = {}
for arm in ["right", "left"]:
bus = robot.bus_right if arm == "right" else robot.bus_left
for i, motor_name in enumerate(bus.motors):
full_name = f"{arm}_{motor_name}"
default_kp = robot.config.position_kp[i] if isinstance(robot.config.position_kp, list) else robot.config.position_kp
default_kd = robot.config.position_kd[i] if isinstance(robot.config.position_kd, list) else robot.config.position_kd
custom_kp[full_name] = default_kp * (kp_scale or 1.0)
custom_kd[full_name] = default_kd * (kd_scale or 1.0)
print(f"Custom gains: kp_scale={kp_scale}, kd_scale={kd_scale}")
if use_velocity_ff:
print("Velocity feedforward: enabled")
last_camera_time = -camera_dt
last_policy_action_time = -policy_dt
cached_observation = None
cached_robot_action = None
start_time = time.perf_counter()
print(f"\nStarting interpolated eval loop:")
print(f" Camera: {camera_fps}Hz | Policy actions consumed: {effective_policy_fps}Hz | Robot: {robot_fps}Hz")
while time.perf_counter() - start_time < control_time_s:
if events["exit_early"] or events["stop_recording"]:
break
loop_start = time.perf_counter()
elapsed = loop_start - start_time
# === CAMERA CAPTURE (at camera_fps, decoupled from robot) ===
if elapsed - last_camera_time >= camera_dt:
obs = robot.get_observation()
obs_processed = robot_observation_processor(obs)
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix="observation")
# Run policy inference with fresh observation
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=task,
robot_type=robot.robot_type,
)
act_processed = make_robot_action(action_values, dataset.features)
cached_robot_action = robot_action_processor((act_processed, obs))
cached_observation = (obs_processed, observation_frame, act_processed)
last_camera_time = elapsed
# === ACTION UPDATE (at effective_policy_fps, faster than camera if speed > 1) ===
if elapsed - last_policy_action_time >= policy_dt and cached_robot_action is not None:
interpolator.update(cached_robot_action)
last_policy_action_time = elapsed
# Save to dataset at effective policy rate
if dataset is not None and cached_observation is not None:
obs_processed, observation_frame, act_processed = cached_observation
action_frame = build_dataset_frame(dataset.features, act_processed, prefix="action")
frame = {**observation_frame, **action_frame, "task": task}
dataset.add_frame(frame)
log_rerun_data(observation=obs_processed, action=act_processed)
# === ROBOT COMMAND (at robot_fps, highest rate for smoothness) ===
smooth_action, velocity = interpolator.get_interpolated_action()
if smooth_action is not None:
vel_ff = velocity if use_velocity_ff else None
robot.send_action(smooth_action, custom_kp=custom_kp, custom_kd=custom_kd, velocity_feedforward=vel_ff)
robot_hz_tracker.tick()
# Maintain robot control rate
sleep_time = robot_dt - (time.perf_counter() - loop_start)
if sleep_time > 0:
time.sleep(sleep_time)
# Print final stats
robot_hz = robot_hz_tracker.get_avg_hz()
if robot_hz:
print(f"\nFinal average robot Hz: {robot_hz:.1f}")
def main():
"""Main evaluation function."""
print("=" * 60)
print("OpenArms Policy Evaluation with Interpolation")
print("=" * 60)
print(f"\nModel: {HF_MODEL_ID}")
print(f"Dataset: {HF_EVAL_DATASET_ID}")
print(f"Task: {TASK_DESCRIPTION}")
print(f"\n--- Timing ---")
print(f"Camera FPS: {CAMERA_FPS} (hardware limit)")
print(f"Policy trained at: {POLICY_FPS}Hz")
print(f"Speed multiplier: {SPEED_MULTIPLIER}x")
print(f"Effective policy FPS: {EFFECTIVE_POLICY_FPS}Hz (actions consumed)")
print(f"Robot FPS: {ROBOT_FPS}Hz (interpolated commands)")
print(f"\n--- PID Tuning ---")
print(f"KP scale: {CUSTOM_KP_SCALE}")
print(f"KD scale: {CUSTOM_KD_SCALE}")
print(f"Velocity feedforward: {USE_VELOCITY_FEEDFORWARD}")
print(f"\n--- Episodes ---")
print(f"Episodes: {NUM_EPISODES}")
print(f"Duration: {EPISODE_TIME_SEC}s per episode")
print(f"Reset time: {RESET_TIME_SEC}s")
print(f"Leader for resets: {USE_LEADER_FOR_RESETS}")
print("=" * 60)
follower_config = OpenArmsFollowerConfig(
port_left=FOLLOWER_LEFT_PORT,
port_right=FOLLOWER_RIGHT_PORT,
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=10.0,
cameras=CAMERA_CONFIG,
)
follower = OpenArmsFollower(follower_config)
follower.connect(calibrate=False)
if not follower.is_connected:
raise RuntimeError("Follower robot failed to connect!")
leader = None
if USE_LEADER_FOR_RESETS:
leader_config = OpenArmsLeaderConfig(
port_left=LEADER_LEFT_PORT,
port_right=LEADER_RIGHT_PORT,
can_interface="socketcan",
id="openarms_leader",
manual_control=False,
)
leader = OpenArmsLeader(leader_config)
leader.connect(calibrate=False)
if not leader.is_connected:
raise RuntimeError("Leader robot failed to connect!")
if leader.pin_robot is not None:
leader.bus_right.enable_torque()
leader.bus_left.enable_torque()
time.sleep(0.1)
print(f"Leader connected with gravity compensation")
else:
print(f"Leader connected (no gravity compensation)")
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
action_features_hw = {}
for key, value in follower.action_features.items():
if key.endswith(".pos"):
action_features_hw[key] = value
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(action=action_features_hw),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
)
dataset_path = Path.home() / ".cache" / "huggingface" / "lerobot" / HF_EVAL_DATASET_ID
if dataset_path.exists():
print(f"\nDataset exists at: {dataset_path}")
choice = input("Continue and append? (y/n): ").strip().lower()
if choice != 'y':
print("Aborting.")
follower.disconnect()
if leader:
leader.disconnect()
return
# Dataset uses effective policy FPS (sped up rate)
dataset = LeRobotDataset.create(
repo_id=HF_EVAL_DATASET_ID,
fps=EFFECTIVE_POLICY_FPS,
features=dataset_features,
robot_type=follower.name,
use_videos=True,
image_writer_processes=0,
image_writer_threads=12,
)
policy_config = PreTrainedConfig.from_pretrained(HF_MODEL_ID)
policy_config.pretrained_path = HF_MODEL_ID
policy = make_policy(policy_config, ds_meta=dataset.meta)
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy.config,
pretrained_path=HF_MODEL_ID,
dataset_stats=dataset.meta.stats,
preprocessor_overrides={
"device_processor": {"device": str(policy.config.device)}
},
)
print(f"\nRunning evaluation...")
listener, events = init_keyboard_listener()
init_rerun(session_name="openarms_evaluation_interp")
interpolator = ActionInterpolator(effective_policy_fps=EFFECTIVE_POLICY_FPS, robot_fps=ROBOT_FPS)
robot_hz_tracker = HzTracker(name="Robot", window_size=100, print_interval=2.0)
episode_idx = 0
try:
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Evaluating episode {episode_idx + 1} of {NUM_EPISODES}")
print(f"\n--- Episode {episode_idx + 1}/{NUM_EPISODES} ---")
interpolated_eval_loop(
robot=follower,
policy=policy,
preprocessor=preprocessor,
postprocessor=postprocessor,
robot_observation_processor=robot_observation_processor,
robot_action_processor=robot_action_processor,
dataset=dataset,
events=events,
interpolator=interpolator,
robot_hz_tracker=robot_hz_tracker,
camera_fps=CAMERA_FPS,
effective_policy_fps=EFFECTIVE_POLICY_FPS,
robot_fps=ROBOT_FPS,
control_time_s=EPISODE_TIME_SEC,
task=TASK_DESCRIPTION,
kp_scale=CUSTOM_KP_SCALE,
kd_scale=CUSTOM_KD_SCALE,
use_velocity_ff=USE_VELOCITY_FEEDFORWARD,
)
if events["rerecord_episode"]:
log_say("Re-recording episode")
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
if dataset.episode_buffer is not None and dataset.episode_buffer.get("size", 0) > 0:
print(f"Saving episode ({dataset.episode_buffer['size']} frames)...")
dataset.save_episode()
episode_idx += 1
if not events["stop_recording"] and episode_idx < NUM_EPISODES:
if USE_LEADER_FOR_RESETS and leader:
log_say("Reset the environment using leader arms")
print(f"\nManual reset ({RESET_TIME_SEC}s)...")
dt = 1 / CAMERA_FPS
reset_start_time = time.perf_counter()
while time.perf_counter() - reset_start_time < RESET_TIME_SEC:
if events["exit_early"] or events["stop_recording"]:
break
loop_start = time.perf_counter()
leader_action = leader.get_action()
leader_positions_deg = {}
leader_velocities_deg_per_sec = {}
for motor in leader.bus_right.motors:
pos_key = f"right_{motor}.pos"
vel_key = f"right_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"right_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"right_{motor}"] = leader_action[vel_key]
for motor in leader.bus_left.motors:
pos_key = f"left_{motor}.pos"
vel_key = f"left_{motor}.vel"
if pos_key in leader_action:
leader_positions_deg[f"left_{motor}"] = leader_action[pos_key]
if vel_key in leader_action:
leader_velocities_deg_per_sec[f"left_{motor}"] = leader_action[vel_key]
leader_positions_rad = {k: np.deg2rad(v) for k, v in leader_positions_deg.items()}
leader_gravity_torques_nm = leader._gravity_from_q(leader_positions_rad)
leader_velocities_rad_per_sec = {k: np.deg2rad(v) for k, v in leader_velocities_deg_per_sec.items()}
leader_friction_torques_nm = leader._friction_from_velocity(
leader_velocities_rad_per_sec, friction_scale=1.0
)
leader_total_torques_nm = {}
for motor_name in leader_gravity_torques_nm:
gravity = leader_gravity_torques_nm.get(motor_name, 0.0)
friction = leader_friction_torques_nm.get(motor_name, 0.0)
leader_total_torques_nm[motor_name] = gravity + friction
for motor in leader.bus_right.motors:
full_name = f"right_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
kd = leader.get_damping_kd(motor)
leader.bus_right._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position, velocity_deg_per_sec=0.0, torque=torque,
)
for motor in leader.bus_left.motors:
full_name = f"left_{motor}"
position = leader_positions_deg.get(full_name, 0.0)
torque = leader_total_torques_nm.get(full_name, 0.0)
kd = leader.get_damping_kd(motor)
leader.bus_left._mit_control(
motor=motor, kp=0.0, kd=kd,
position_degrees=position, velocity_deg_per_sec=0.0, torque=torque,
)
follower_action = {}
for joint in leader_positions_deg.keys():
pos_key = f"{joint}.pos"
if pos_key in leader_action:
follower_action[pos_key] = leader_action[pos_key]
if follower_action:
follower.send_action(follower_action)
loop_duration = time.perf_counter() - loop_start
sleep_time = dt - loop_duration
if sleep_time > 0:
time.sleep(sleep_time)
print("Reset complete")
else:
log_say("Waiting for manual reset")
input("Press ENTER when ready...")
print(f"\nEvaluation complete! {episode_idx} episodes recorded")
log_say("Evaluation complete", blocking=True)
except KeyboardInterrupt:
print("\n\nInterrupted by user")
finally:
if leader:
leader.bus_right.disable_torque()
leader.bus_left.disable_torque()
time.sleep(0.1)
leader.disconnect()
follower.disconnect()
if listener is not None:
listener.stop()
dataset.finalize()
print("\nUploading to Hugging Face Hub...")
dataset.push_to_hub(private=True)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,894 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenArms Policy Evaluation with RTC + Interpolation
Combines Real-Time Chunking (RTC) with smooth action interpolation:
- RTC for reactive motion despite high inference latency
- Action interpolation for smooth robot movements
- Speed multiplier to execute faster than training
- Velocity feedforward and PID tuning
- Decoupled inference (async) from robot control
Example usage:
python examples/openarms/evaluate_with_rtc_interpolation.py
# With custom RTC parameters
python examples/openarms/evaluate_with_rtc_interpolation.py \
--rtc.execution_horizon=12 \
--rtc.max_guidance_weight=10.0
"""
import logging
import math
import sys
import time
import traceback
from collections import deque
from dataclasses import dataclass, field
from pathlib import Path
from threading import Event, Lock, Thread
import torch
from torch import Tensor
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.configs.types import RTCAttentionSchedule
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts, hw_to_dataset_features
from lerobot.policies.factory import get_policy_class, make_pre_post_processors
from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.rtc.latency_tracker import LatencyTracker
from lerobot.processor import make_default_processors
from lerobot.rl.process import ProcessSignalHandler
from lerobot.robots.openarms.config_openarms_follower import OpenArmsFollowerConfig
from lerobot.robots.openarms.openarms_follower import OpenArmsFollower
from lerobot.utils.hub import HubMixin
from lerobot.utils.utils import init_logging, log_say
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================================
# Default Configuration Constants
# ============================================================================
DEFAULT_HF_MODEL_ID = "lerobot-data-collection/three-folds-pi0"
DEFAULT_HF_EVAL_DATASET_ID = "lerobot-data-collection/three-folds-pi0_eval_rtc_interp"
DEFAULT_TASK_DESCRIPTION = "three-folds-dataset"
DEFAULT_NUM_EPISODES = 1
DEFAULT_CAMERA_FPS = 30 # Camera hardware limit
DEFAULT_POLICY_FPS = 30 # What the policy was trained with
DEFAULT_SPEED_MULTIPLIER = 1.0 # Execute actions faster (1.0 = normal, 1.2 = 20% faster)
DEFAULT_ROBOT_FPS = 50 # Robot command rate (higher = smoother)
DEFAULT_EPISODE_TIME_SEC = 300
DEFAULT_RESET_TIME_SEC = 60
DEFAULT_FOLLOWER_LEFT_PORT = "can0"
DEFAULT_FOLLOWER_RIGHT_PORT = "can1"
# PID tuning defaults
DEFAULT_KP_SCALE = 0.7 # Lower = smoother but slower
DEFAULT_KD_SCALE = 1.3 # Higher = less overshoot
DEFAULT_USE_VELOCITY_FF = True # Velocity feedforward
DEFAULT_CAMERA_CONFIG = {
"left_wrist": OpenCVCameraConfig(index_or_path="/dev/video5", width=640, height=480, fps=DEFAULT_CAMERA_FPS),
"right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=DEFAULT_CAMERA_FPS),
"base": OpenCVCameraConfig(index_or_path="/dev/video3", width=640, height=480, fps=DEFAULT_CAMERA_FPS),
}
# ============================================================================
# Action Interpolator
# ============================================================================
class ActionInterpolator:
"""Interpolate between RTC actions for smoother robot control with velocity estimation."""
def __init__(self, robot_fps: int):
self.robot_fps = robot_fps
self.prev_action: Tensor | None = None
self.curr_action: Tensor | None = None
self.prev_time: float = 0
self.curr_time: float = 0
self.last_interpolated: Tensor | None = None
def update(self, new_action: Tensor) -> None:
self.prev_action = self.curr_action
self.prev_time = self.curr_time
self.curr_action = new_action
self.curr_time = time.perf_counter()
def get_interpolated_action(self) -> tuple[Tensor | None, Tensor | None]:
"""Returns (interpolated_position, estimated_velocity)"""
if self.curr_action is None:
return None, None
if self.prev_action is None:
self.last_interpolated = self.curr_action.clone()
return self.curr_action, torch.zeros_like(self.curr_action)
# Time-based interpolation
current_time = time.perf_counter()
dt_actions = self.curr_time - self.prev_time
if dt_actions <= 0:
dt_actions = 1.0 / 30 # Fallback
t = (current_time - self.prev_time) / dt_actions
t = max(0.0, min(t, 1.5)) # Allow slight extrapolation
interpolated = self.prev_action + t * (self.curr_action - self.prev_action)
# Estimate velocity
dt_robot = 1.0 / self.robot_fps
if self.last_interpolated is not None:
velocity = (interpolated - self.last_interpolated) / dt_robot
else:
velocity = (self.curr_action - self.prev_action) / dt_actions
self.last_interpolated = interpolated.clone()
return interpolated, velocity
def reset(self):
self.prev_action = None
self.curr_action = None
self.prev_time = 0
self.curr_time = 0
self.last_interpolated = None
class HzTracker:
"""Track and display actual loop frequency."""
def __init__(self, name: str = "Loop", window_size: int = 100, print_interval: float = 2.0):
self.name = name
self.timestamps = deque(maxlen=window_size)
self.last_print_time = 0
self.print_interval = print_interval
self.extra_info_fn = None # Optional callback for extra info
def tick(self) -> float | None:
now = time.perf_counter()
self.timestamps.append(now)
if len(self.timestamps) < 2:
return None
hz = (len(self.timestamps) - 1) / (self.timestamps[-1] - self.timestamps[0])
if now - self.last_print_time >= self.print_interval:
extra = ""
if self.extra_info_fn:
extra = self.extra_info_fn()
print(f"[CONTROL] {self.name}: {hz:.1f} Hz{extra}", flush=True)
self.last_print_time = now
return hz
def get_avg_hz(self) -> float | None:
if len(self.timestamps) < 2:
return None
return (len(self.timestamps) - 1) / (self.timestamps[-1] - self.timestamps[0])
def reset(self):
self.timestamps.clear()
self.last_print_time = 0
# ============================================================================
# Thread-Safe Robot Wrapper
# ============================================================================
class RobotWrapper:
"""Thread-safe wrapper for robot operations with custom PID gains."""
def __init__(
self,
robot: OpenArmsFollower,
custom_kp: dict | None = None,
custom_kd: dict | None = None,
use_velocity_ff: bool = False,
):
self.robot = robot
self.lock = Lock()
self.custom_kp = custom_kp
self.custom_kd = custom_kd
self.use_velocity_ff = use_velocity_ff
def get_observation(self) -> dict[str, Tensor]:
with self.lock:
return self.robot.get_observation()
def send_action(self, action: dict, velocity_ff: dict | None = None) -> None:
with self.lock:
vel_ff = velocity_ff if self.use_velocity_ff else None
self.robot.send_action(
action,
custom_kp=self.custom_kp,
custom_kd=self.custom_kd,
velocity_feedforward=vel_ff,
)
@property
def observation_features(self) -> dict:
with self.lock:
return self.robot.observation_features
@property
def action_features(self) -> dict:
with self.lock:
return self.robot.action_features
@property
def name(self) -> str:
return self.robot.name
# ============================================================================
# Configuration
# ============================================================================
@dataclass
class OpenArmsRTCInterpEvalConfig(HubMixin):
"""Configuration for OpenArms evaluation with RTC + Interpolation."""
policy: PreTrainedConfig | None = None
rtc: RTCConfig = field(
default_factory=lambda: RTCConfig(
enabled=True,
execution_horizon=10,
max_guidance_weight=10.0,
prefix_attention_schedule=RTCAttentionSchedule.EXP,
)
)
model_id: str = DEFAULT_HF_MODEL_ID
eval_dataset_id: str = DEFAULT_HF_EVAL_DATASET_ID
task: str = DEFAULT_TASK_DESCRIPTION
num_episodes: int = DEFAULT_NUM_EPISODES
camera_fps: float = DEFAULT_CAMERA_FPS
policy_fps: float = DEFAULT_POLICY_FPS
speed_multiplier: float = DEFAULT_SPEED_MULTIPLIER
robot_fps: float = DEFAULT_ROBOT_FPS
episode_time_sec: float = DEFAULT_EPISODE_TIME_SEC
reset_time_sec: float = DEFAULT_RESET_TIME_SEC
# PID tuning
kp_scale: float | None = DEFAULT_KP_SCALE
kd_scale: float | None = DEFAULT_KD_SCALE
use_velocity_ff: bool = DEFAULT_USE_VELOCITY_FF
follower_left_port: str = DEFAULT_FOLLOWER_LEFT_PORT
follower_right_port: str = DEFAULT_FOLLOWER_RIGHT_PORT
device: str = "cuda"
# Should be higher than inference_delay + execution_horizon
action_queue_size_to_get_new_actions: int = 30
record_dataset: bool = True
push_to_hub: bool = True
use_torch_compile: bool = False
torch_compile_backend: str = "inductor"
torch_compile_mode: str = "default"
torch_compile_disable_cudagraphs: bool = True
@property
def effective_policy_fps(self) -> int:
return int(self.policy_fps * self.speed_multiplier)
def __post_init__(self):
policy_path = parser.get_path_arg("policy")
if policy_path:
cli_overrides = parser.get_cli_overrides("policy")
self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides)
self.policy.pretrained_path = policy_path
self.model_id = policy_path
elif self.model_id:
self.policy = PreTrainedConfig.from_pretrained(self.model_id)
self.policy.pretrained_path = self.model_id
@classmethod
def __get_path_fields__(cls) -> list[str]:
return ["policy"]
# ============================================================================
# Action Generation Thread (RTC)
# ============================================================================
def get_actions_thread(
policy,
robot: RobotWrapper,
robot_observation_processor,
action_queue: ActionQueue,
shutdown_event: Event,
cfg: OpenArmsRTCInterpEvalConfig,
episode_active: Event,
):
"""Thread function to asynchronously generate action chunks from the policy using RTC."""
try:
logger.info("[GET_ACTIONS] Starting RTC action generation thread")
latency_tracker = LatencyTracker()
inference_hz_tracker = HzTracker(name="Inference", window_size=20, print_interval=5.0)
time_per_chunk = 1.0 / cfg.effective_policy_fps # Use effective FPS with speed multiplier
hw_features = hw_to_dataset_features(robot.observation_features, "observation")
policy_device = policy.config.device
logger.info(f"[GET_ACTIONS] Loading preprocessor/postprocessor from {cfg.policy.pretrained_path}")
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=cfg.policy.pretrained_path,
dataset_stats=None,
preprocessor_overrides={
"device_processor": {"device": cfg.device},
},
)
logger.info("[GET_ACTIONS] Preprocessor/postprocessor loaded successfully")
get_actions_threshold = cfg.action_queue_size_to_get_new_actions
if not cfg.rtc.enabled:
get_actions_threshold = 0
while not shutdown_event.is_set():
if not episode_active.is_set():
time.sleep(0.01)
continue
if action_queue.qsize() <= get_actions_threshold:
current_time = time.perf_counter()
action_index_before_inference = action_queue.get_action_index()
prev_actions = action_queue.get_left_over()
inference_latency = latency_tracker.max()
inference_delay = math.ceil(inference_latency / time_per_chunk) if inference_latency else 0
obs = robot.get_observation()
obs_processed = robot_observation_processor(obs)
obs_with_policy_features = build_dataset_frame(
hw_features, obs_processed, prefix="observation"
)
for name in obs_with_policy_features:
obs_with_policy_features[name] = torch.from_numpy(obs_with_policy_features[name])
if "image" in name:
obs_with_policy_features[name] = (
obs_with_policy_features[name].type(torch.float32) / 255
)
obs_with_policy_features[name] = (
obs_with_policy_features[name].permute(2, 0, 1).contiguous()
)
obs_with_policy_features[name] = obs_with_policy_features[name].unsqueeze(0)
obs_with_policy_features[name] = obs_with_policy_features[name].to(policy_device)
obs_with_policy_features["task"] = [cfg.task]
obs_with_policy_features["robot_type"] = robot.name
preprocessed_obs = preprocessor(obs_with_policy_features)
actions = policy.predict_action_chunk(
preprocessed_obs,
inference_delay=inference_delay,
prev_chunk_left_over=prev_actions,
)
original_actions = actions.squeeze(0).clone()
postprocessed_actions = postprocessor(actions).squeeze(0)
new_latency = time.perf_counter() - current_time
new_delay = math.ceil(new_latency / time_per_chunk)
latency_tracker.add(new_latency)
# Set extra info to show latency
inference_hz_tracker.extra_info_fn = lambda lat=new_latency, delay=new_delay: f" | Latency: {lat*1000:.0f}ms | Delay: {delay}"
inference_hz_tracker.tick()
if cfg.action_queue_size_to_get_new_actions < cfg.rtc.execution_horizon + new_delay:
logger.warning(
"[GET_ACTIONS] action_queue_size_to_get_new_actions too small. "
"Should be higher than inference delay + execution horizon."
)
action_queue.merge(
original_actions, postprocessed_actions, new_delay, action_index_before_inference
)
logger.debug(
f"[GET_ACTIONS] Generated chunk, latency={new_latency:.3f}s, "
f"delay={new_delay}, queue_size={action_queue.qsize()}"
)
else:
time.sleep(0.01)
logger.info("[GET_ACTIONS] Action generation thread shutting down")
except Exception as e:
logger.error(f"[GET_ACTIONS] Fatal exception: {e}")
logger.error(traceback.format_exc())
shutdown_event.set()
sys.exit(1)
# ============================================================================
# Actor Thread with Interpolation
# ============================================================================
def actor_thread(
robot: RobotWrapper,
robot_action_processor,
action_queue: ActionQueue,
shutdown_event: Event,
cfg: OpenArmsRTCInterpEvalConfig,
episode_active: Event,
dataset: LeRobotDataset | None,
dataset_lock: Lock,
teleop_action_processor,
robot_observation_processor,
interpolator: ActionInterpolator,
hz_tracker: HzTracker,
dataset_features: dict,
):
"""Thread function to execute interpolated actions on the robot at high frequency."""
try:
logger.info(f"[ACTOR] Starting actor thread with interpolation at {cfg.robot_fps}Hz")
action_count = 0
robot_interval = 1.0 / cfg.robot_fps # High frequency robot control
effective_policy_interval = 1.0 / cfg.effective_policy_fps # Action consume rate
action_keys = [k for k in robot.action_features.keys() if k.endswith(".pos")]
last_action_consume_time = 0
interpolator.reset()
hz_tracker.reset()
# Set up extra info callback to show queue size
hz_tracker.extra_info_fn = lambda: f" | Queue: {action_queue.qsize()}"
while not shutdown_event.is_set():
if not episode_active.is_set():
time.sleep(0.01)
interpolator.reset()
hz_tracker.reset()
last_action_consume_time = 0
continue
start_time = time.perf_counter()
# Consume new action from RTC queue at effective_policy_fps rate
current_time = time.perf_counter()
if current_time - last_action_consume_time >= effective_policy_interval:
action = action_queue.get()
if action is not None:
action = action.cpu()
interpolator.update(action)
last_action_consume_time = current_time
# Record to dataset at action consume rate
if cfg.record_dataset and dataset is not None:
with dataset_lock:
obs = robot.get_observation()
obs_processed = robot_observation_processor(obs)
action_dict = {}
for i, key in enumerate(action_keys):
if i < len(action):
action_dict[key] = action[i].item()
action_for_dataset = teleop_action_processor((action_dict, None))
# Use build_dataset_frame to properly format keys
observation_frame = build_dataset_frame(
dataset_features, obs_processed, prefix="observation"
)
action_frame = build_dataset_frame(
dataset_features, action_for_dataset, prefix="action"
)
frame = {**observation_frame, **action_frame, "task": cfg.task}
dataset.add_frame(frame)
# Get interpolated action and send to robot at robot_fps (highest rate)
interp_action, velocity = interpolator.get_interpolated_action()
if interp_action is not None:
# Convert tensor to dict
action_dict = {}
velocity_dict = {}
for i, key in enumerate(action_keys):
if i < len(interp_action):
action_dict[key] = interp_action[i].item()
if velocity is not None:
motor_name = key.removesuffix(".pos")
velocity_dict[motor_name] = velocity[i].item()
action_processed = robot_action_processor((action_dict, None))
robot.send_action(action_processed, velocity_ff=velocity_dict)
action_count += 1
hz_tracker.tick()
# Maintain robot control rate
dt_s = time.perf_counter() - start_time
sleep_time = max(0, robot_interval - dt_s - 0.001)
if sleep_time > 0:
time.sleep(sleep_time)
final_hz = hz_tracker.get_avg_hz()
if final_hz:
logger.info(f"[ACTOR] Final robot Hz: {final_hz:.1f}")
logger.info(f"[ACTOR] Actor thread shutting down. Total actions executed: {action_count}")
except Exception as e:
logger.error(f"[ACTOR] Fatal exception: {e}")
logger.error(traceback.format_exc())
shutdown_event.set()
sys.exit(1)
# ============================================================================
# Helper Functions
# ============================================================================
def build_custom_gains(robot: OpenArmsFollower, kp_scale: float | None, kd_scale: float | None) -> tuple[dict | None, dict | None]:
"""Build custom PID gains dict from robot config."""
if kp_scale is None and kd_scale is None:
return None, None
custom_kp = {}
custom_kd = {}
for arm in ["right", "left"]:
bus = robot.bus_right if arm == "right" else robot.bus_left
for i, motor_name in enumerate(bus.motors):
full_name = f"{arm}_{motor_name}"
default_kp = robot.config.position_kp[i] if isinstance(robot.config.position_kp, list) else robot.config.position_kp
default_kd = robot.config.position_kd[i] if isinstance(robot.config.position_kd, list) else robot.config.position_kd
custom_kp[full_name] = default_kp * (kp_scale or 1.0)
custom_kd[full_name] = default_kd * (kd_scale or 1.0)
return custom_kp, custom_kd
def _apply_torch_compile(policy, cfg: OpenArmsRTCInterpEvalConfig):
"""Apply torch.compile to the policy's predict_action_chunk method."""
if policy.name in ["pi05", "pi0"]:
return policy
try:
if not hasattr(torch, "compile"):
logger.warning(
f"torch.compile not available. Requires PyTorch 2.0+. "
f"Current version: {torch.__version__}. Skipping compilation."
)
return policy
logger.info("Applying torch.compile to predict_action_chunk...")
compile_kwargs = {
"backend": cfg.torch_compile_backend,
"mode": cfg.torch_compile_mode,
}
if cfg.torch_compile_disable_cudagraphs:
compile_kwargs["options"] = {"triton.cudagraphs": False}
original_method = policy.predict_action_chunk
compiled_method = torch.compile(original_method, **compile_kwargs)
policy.predict_action_chunk = compiled_method
logger.info("Successfully compiled predict_action_chunk")
except Exception as e:
logger.error(f"Failed to apply torch.compile: {e}")
logger.warning("Continuing without torch.compile")
return policy
# ============================================================================
# Main Evaluation Function
# ============================================================================
@parser.wrap()
def main(cfg: OpenArmsRTCInterpEvalConfig):
"""Main evaluation function with RTC + Interpolation."""
init_logging()
print("=" * 70)
print("OpenArms Policy Evaluation with RTC + Interpolation")
print("=" * 70)
print(f"\nModel: {cfg.model_id}")
print(f"Evaluation Dataset: {cfg.eval_dataset_id}")
print(f"Task: {cfg.task}")
print(f"\n--- Timing ---")
print(f"Camera FPS: {cfg.camera_fps} (hardware limit)")
print(f"Policy trained at: {cfg.policy_fps}Hz")
print(f"Speed multiplier: {cfg.speed_multiplier}x")
print(f"Effective policy FPS: {cfg.effective_policy_fps}Hz (action consume rate)")
print(f"Robot FPS: {cfg.robot_fps}Hz (interpolated commands)")
print(f"\n--- RTC ---")
print(f"RTC Enabled: {cfg.rtc.enabled}")
print(f"Execution Horizon: {cfg.rtc.execution_horizon}")
print(f"Max Guidance Weight: {cfg.rtc.max_guidance_weight}")
print(f"\n--- PID Tuning ---")
print(f"KP scale: {cfg.kp_scale}")
print(f"KD scale: {cfg.kd_scale}")
print(f"Velocity feedforward: {cfg.use_velocity_ff}")
print(f"\n--- Episodes ---")
print(f"Episodes: {cfg.num_episodes}")
print(f"Duration: {cfg.episode_time_sec}s per episode")
print(f"Device: {cfg.device}")
print("=" * 70)
signal_handler = ProcessSignalHandler(use_threads=True, display_pid=False)
shutdown_event = signal_handler.shutdown_event
episode_active = Event()
# Initialize Robot
camera_config = {
"left_wrist": OpenCVCameraConfig(index_or_path="/dev/video5", width=640, height=480, fps=int(cfg.camera_fps)),
"right_wrist": OpenCVCameraConfig(index_or_path="/dev/video1", width=640, height=480, fps=int(cfg.camera_fps)),
"base": OpenCVCameraConfig(index_or_path="/dev/video3", width=640, height=480, fps=int(cfg.camera_fps)),
}
follower_config = OpenArmsFollowerConfig(
port_left=cfg.follower_left_port,
port_right=cfg.follower_right_port,
can_interface="socketcan",
id="openarms_follower",
disable_torque_on_disconnect=True,
max_relative_target=15.0,
cameras=camera_config,
)
follower = OpenArmsFollower(follower_config)
follower.connect(calibrate=False)
if not follower.is_connected:
raise RuntimeError("Follower robot failed to connect!")
# Build custom PID gains
custom_kp, custom_kd = build_custom_gains(follower, cfg.kp_scale, cfg.kd_scale)
if custom_kp:
logger.info(f"Custom gains: kp_scale={cfg.kp_scale}, kd_scale={cfg.kd_scale}")
if cfg.use_velocity_ff:
logger.info("Velocity feedforward enabled")
robot = RobotWrapper(
follower,
custom_kp=custom_kp,
custom_kd=custom_kd,
use_velocity_ff=cfg.use_velocity_ff,
)
logger.info("Follower robot connected")
# Build Processors and Dataset Features
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
action_features_hw = {}
for key, value in follower.action_features.items():
if key.endswith(".pos"):
action_features_hw[key] = value
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(action=action_features_hw),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
)
# Create or Load Dataset
dataset = None
dataset_lock = Lock()
if cfg.record_dataset:
dataset_path = Path.home() / ".cache" / "huggingface" / "lerobot" / cfg.eval_dataset_id
if dataset_path.exists():
logger.info(f"Evaluation dataset exists at: {dataset_path}")
logger.info("New episodes will be appended.")
choice = input("Continue? (y/n): ").strip().lower()
if choice != "y":
logger.info("Aborting evaluation.")
follower.disconnect()
return
# Dataset uses effective policy FPS
dataset = LeRobotDataset.create(
repo_id=cfg.eval_dataset_id,
fps=cfg.effective_policy_fps,
features=dataset_features,
robot_type=follower.name,
use_videos=True,
image_writer_processes=0,
image_writer_threads=12,
)
logger.info(f"Dataset created: {cfg.eval_dataset_id} at {cfg.effective_policy_fps}Hz")
# Load Policy
logger.info(f"Loading policy from: {cfg.model_id}")
policy_class = get_policy_class(cfg.policy.type)
config = PreTrainedConfig.from_pretrained(cfg.policy.pretrained_path)
if cfg.policy.type in ["pi05", "pi0"]:
config.compile_model = cfg.use_torch_compile
policy = policy_class.from_pretrained(cfg.policy.pretrained_path, config=config)
policy.config.rtc_config = cfg.rtc
policy.init_rtc_processor()
assert policy.name in ["smolvla", "pi05", "pi0"], "Only smolvla, pi05, and pi0 are supported for RTC"
policy = policy.to(cfg.device)
policy.eval()
if cfg.use_torch_compile:
policy = _apply_torch_compile(policy, cfg)
logger.info(f"Policy loaded: {policy.name}")
# Create Action Queue, Interpolator, and Hz Tracker
action_queue = ActionQueue(cfg.rtc)
interpolator = ActionInterpolator(robot_fps=int(cfg.robot_fps))
hz_tracker = HzTracker(name="Robot", window_size=100, print_interval=2.0)
# Start Threads
get_actions_t = Thread(
target=get_actions_thread,
args=(
policy,
robot,
robot_observation_processor,
action_queue,
shutdown_event,
cfg,
episode_active,
),
daemon=True,
name="GetActions",
)
get_actions_t.start()
logger.info("Started RTC action generation thread")
actor_t = Thread(
target=actor_thread,
args=(
robot,
robot_action_processor,
action_queue,
shutdown_event,
cfg,
episode_active,
dataset,
dataset_lock,
teleop_action_processor,
robot_observation_processor,
interpolator,
hz_tracker,
dataset_features,
),
daemon=True,
name="Actor",
)
actor_t.start()
logger.info(f"Started actor thread with interpolation at {cfg.robot_fps}Hz")
# Run Evaluation Episodes
episode_idx = 0
try:
while episode_idx < cfg.num_episodes and not shutdown_event.is_set():
log_say(f"Evaluating episode {episode_idx + 1} of {cfg.num_episodes}")
logger.info(f"\n{'='*50}")
logger.info(f"Episode {episode_idx + 1} / {cfg.num_episodes}")
logger.info(f"{'='*50}")
action_queue = ActionQueue(cfg.rtc)
interpolator.reset()
hz_tracker.reset()
episode_active.set()
episode_start_time = time.time()
while (time.time() - episode_start_time) < cfg.episode_time_sec:
if shutdown_event.is_set():
break
elapsed = time.time() - episode_start_time
if int(elapsed) % 30 == 0 and int(elapsed) > 0:
logger.info(
f"[MAIN] Episode progress: {elapsed:.0f}/{cfg.episode_time_sec}s, "
f"queue_size={action_queue.qsize()}"
)
time.sleep(0.5)
episode_active.clear()
logger.info(f"Episode {episode_idx + 1} completed")
if cfg.record_dataset and dataset is not None:
with dataset_lock:
if dataset.episode_buffer is not None and dataset.episode_buffer.get("size", 0) > 0:
logger.info(
f"Saving episode {episode_idx + 1} "
f"({dataset.episode_buffer['size']} frames)"
)
dataset.save_episode()
episode_idx += 1
# Manual reset between episodes
if not shutdown_event.is_set() and episode_idx < cfg.num_episodes:
log_say("Waiting for manual reset")
logger.info("Manually reset the environment and press ENTER to continue")
input("Press ENTER when ready...")
logger.info(f"Evaluation complete! {episode_idx} episodes recorded")
log_say("Evaluation complete", blocking=True)
except KeyboardInterrupt:
logger.info("\n\nEvaluation interrupted by user")
finally:
shutdown_event.set()
episode_active.clear()
if get_actions_t.is_alive():
logger.info("Waiting for action generation thread to finish...")
get_actions_t.join(timeout=5.0)
if actor_t.is_alive():
logger.info("Waiting for actor thread to finish...")
actor_t.join(timeout=5.0)
follower.disconnect()
logger.info("Follower disconnected")
if cfg.record_dataset and dataset is not None:
dataset.finalize()
if cfg.push_to_hub:
logger.info("Uploading to Hugging Face Hub...")
dataset.push_to_hub(private=True)
logger.info("Cleanup completed")
if __name__ == "__main__":
main()

View File

@@ -367,7 +367,8 @@ class OpenArmsFollower(Robot):
self,
action: Dict[str, Any],
custom_kp: Optional[Dict[str, float]] = None,
custom_kd: Optional[Dict[str, float]] = None
custom_kd: Optional[Dict[str, float]] = None,
velocity_feedforward: Optional[Dict[str, float]] = None,
) -> Dict[str, Any]:
"""
Send action command to robot.
@@ -378,6 +379,7 @@ class OpenArmsFollower(Robot):
action: Dictionary with motor positions (e.g., "right_joint_1.pos", "left_joint_2.pos")
custom_kp: Optional custom kp gains per motor (e.g., {"right_joint_1": 120.0, "left_joint_2": 150.0})
custom_kd: Optional custom kd gains per motor (e.g., {"right_joint_1": 1.5, "left_joint_2": 2.0})
velocity_feedforward: Optional velocity feedforward in deg/s per motor for smoother tracking
Returns:
The action actually sent (potentially clipped)
@@ -462,9 +464,9 @@ class OpenArmsFollower(Robot):
commands_right = {}
for motor_name, position_degrees in goal_pos_right.items():
idx = motor_index.get(motor_name, 0)
full_motor_name = f"right_{motor_name}"
# Use custom gains if provided, otherwise use config defaults
full_motor_name = f"right_{motor_name}"
if custom_kp is not None and full_motor_name in custom_kp:
kp = custom_kp[full_motor_name]
else:
@@ -475,7 +477,12 @@ class OpenArmsFollower(Robot):
else:
kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd
commands_right[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
# Get velocity feedforward if provided
vel_ff = 0.0
if velocity_feedforward is not None and full_motor_name in velocity_feedforward:
vel_ff = velocity_feedforward[full_motor_name]
commands_right[motor_name] = (kp, kd, position_degrees, vel_ff, 0.0)
self.bus_right._mit_control_batch(commands_right)
# Use batch MIT control for left arm (sends all commands, then collects responses)
@@ -483,9 +490,9 @@ class OpenArmsFollower(Robot):
commands_left = {}
for motor_name, position_degrees in goal_pos_left.items():
idx = motor_index.get(motor_name, 0)
full_motor_name = f"left_{motor_name}"
# Use custom gains if provided, otherwise use config defaults
full_motor_name = f"left_{motor_name}"
if custom_kp is not None and full_motor_name in custom_kp:
kp = custom_kp[full_motor_name]
else:
@@ -496,7 +503,12 @@ class OpenArmsFollower(Robot):
else:
kd = self.config.position_kd[idx] if isinstance(self.config.position_kd, list) else self.config.position_kd
commands_left[motor_name] = (kp, kd, position_degrees, 0.0, 0.0)
# Get velocity feedforward if provided
vel_ff = 0.0
if velocity_feedforward is not None and full_motor_name in velocity_feedforward:
vel_ff = velocity_feedforward[full_motor_name]
commands_left[motor_name] = (kp, kd, position_degrees, vel_ff, 0.0)
self.bus_left._mit_control_batch(commands_left)
# Return the actions that were actually sent