mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-04 12:51:27 +00:00
Merge branch 'main' into user/michel-aractingi/2025_06_30_dataset_v3
Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>
This commit is contained in:
@@ -24,6 +24,7 @@ This guide provides step-by-step instructions for training a robot policy using
|
||||
- A gamepad (recommended) or keyboard to control the robot
|
||||
- A Nvidia GPU
|
||||
- A real robot with a follower and leader arm (optional if you use the keyboard or the gamepad)
|
||||
- A URDF file for the robot for the kinematics package (check `lerobot/common/model/kinematics.py`)
|
||||
|
||||
## What kind of tasks can I train?
|
||||
|
||||
|
||||
@@ -1,32 +1,90 @@
|
||||
from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.datasets.utils import hw_to_dataset_features
|
||||
from lerobot.policies.act.modeling_act import ACTPolicy
|
||||
from lerobot.record import record_loop
|
||||
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
|
||||
from lerobot.utils.control_utils import predict_action
|
||||
from lerobot.utils.utils import get_safe_torch_device
|
||||
from lerobot.utils.control_utils import init_keyboard_listener
|
||||
from lerobot.utils.utils import log_say
|
||||
from lerobot.utils.visualization_utils import _init_rerun
|
||||
|
||||
NB_CYCLES_CLIENT_CONNECTION = 1000
|
||||
NUM_EPISODES = 2
|
||||
FPS = 30
|
||||
EPISODE_TIME_SEC = 60
|
||||
TASK_DESCRIPTION = "My task description"
|
||||
|
||||
# Create the robot and teleoperator configurations
|
||||
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
|
||||
robot = LeKiwiClient(robot_config)
|
||||
|
||||
policy = ACTPolicy.from_pretrained("<hf_username>/<policy_repo_id>")
|
||||
|
||||
# Configure the dataset features
|
||||
action_features = hw_to_dataset_features(robot.action_features, "action")
|
||||
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
|
||||
dataset_features = {**action_features, **obs_features}
|
||||
|
||||
# Create the dataset
|
||||
dataset = LeRobotDataset.create(
|
||||
repo_id="<hf_username>/<eval_dataset_repo_id>",
|
||||
fps=FPS,
|
||||
features=dataset_features,
|
||||
robot_type=robot.name,
|
||||
use_videos=True,
|
||||
image_writer_threads=4,
|
||||
)
|
||||
|
||||
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
|
||||
robot.connect()
|
||||
|
||||
policy = ACTPolicy.from_pretrained("pepijn223/act_lekiwi_circle")
|
||||
policy.reset()
|
||||
_init_rerun(session_name="recording")
|
||||
|
||||
obs_features = hw_to_dataset_features(robot.observation_features, "observation")
|
||||
listener, events = init_keyboard_listener()
|
||||
|
||||
print("Running inference")
|
||||
i = 0
|
||||
while i < NB_CYCLES_CLIENT_CONNECTION:
|
||||
obs = robot.get_observation()
|
||||
if not robot.is_connected:
|
||||
raise ValueError("Robot is not connected!")
|
||||
|
||||
observation_frame = build_dataset_frame(obs_features, obs, prefix="observation")
|
||||
action_values = predict_action(
|
||||
observation_frame, policy, get_safe_torch_device(policy.config.device), policy.config.use_amp
|
||||
recorded_episodes = 0
|
||||
while recorded_episodes < NUM_EPISODES and not events["stop_recording"]:
|
||||
log_say(f"Running inference, recording eval episode {recorded_episodes} of {NUM_EPISODES}")
|
||||
|
||||
# Run the policy inference loop
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
policy=policy,
|
||||
dataset=dataset,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
|
||||
robot.send_action(action)
|
||||
i += 1
|
||||
|
||||
# Logic for reset env
|
||||
if not events["stop_recording"] and (
|
||||
(recorded_episodes < NUM_EPISODES - 1) or events["rerecord_episode"]
|
||||
):
|
||||
log_say("Reset the environment")
|
||||
record_loop(
|
||||
robot=robot,
|
||||
events=events,
|
||||
fps=FPS,
|
||||
control_time_s=EPISODE_TIME_SEC,
|
||||
single_task=TASK_DESCRIPTION,
|
||||
display_data=True,
|
||||
)
|
||||
|
||||
if events["rerecord_episode"]:
|
||||
log_say("Re-record episode")
|
||||
events["rerecord_episode"] = False
|
||||
events["exit_early"] = False
|
||||
dataset.clear_episode_buffer()
|
||||
continue
|
||||
|
||||
dataset.save_episode()
|
||||
recorded_episodes += 1
|
||||
|
||||
# Upload to hub and clean up
|
||||
dataset.push_to_hub()
|
||||
|
||||
robot.disconnect()
|
||||
listener.stop()
|
||||
|
||||
@@ -4,22 +4,30 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
|
||||
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
|
||||
from lerobot.utils.robot_utils import busy_wait
|
||||
from lerobot.utils.utils import log_say
|
||||
|
||||
EPISODE_IDX = 0
|
||||
|
||||
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="lekiwi")
|
||||
robot = LeKiwiClient(robot_config)
|
||||
|
||||
dataset = LeRobotDataset("pepijn223/lekiwi1749025613", episodes=[0])
|
||||
dataset = LeRobotDataset("<hf_username>/<dataset_repo_id>", episodes=[EPISODE_IDX])
|
||||
actions = dataset.hf_dataset.select_columns("action")
|
||||
|
||||
robot.connect()
|
||||
|
||||
print("Replaying episode…")
|
||||
for _, action_array in enumerate(dataset.hf_dataset["action"]):
|
||||
if not robot.is_connected:
|
||||
raise ValueError("Robot is not connected!")
|
||||
|
||||
log_say(f"Replaying episode {EPISODE_IDX}")
|
||||
for idx in range(dataset.num_frames):
|
||||
t0 = time.perf_counter()
|
||||
|
||||
action = {name: float(action_array[i]) for i, name in enumerate(dataset.features["action"]["names"])}
|
||||
action = {
|
||||
name: float(actions[idx]["action"][i]) for i, name in enumerate(dataset.features["action"]["names"])
|
||||
}
|
||||
robot.send_action(action)
|
||||
|
||||
busy_wait(max(1.0 / dataset.fps - (time.perf_counter() - t0), 0.0))
|
||||
|
||||
print("Disconnecting LeKiwi Client")
|
||||
robot.disconnect()
|
||||
|
||||
@@ -1,32 +1,47 @@
|
||||
import time
|
||||
|
||||
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
|
||||
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig
|
||||
from lerobot.teleoperators.so100_leader import SO100Leader, SO100LeaderConfig
|
||||
from lerobot.utils.robot_utils import busy_wait
|
||||
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
||||
|
||||
FPS = 30
|
||||
|
||||
# Create the robot and teleoperator configurations
|
||||
robot_config = LeKiwiClientConfig(remote_ip="172.18.134.136", id="my_lekiwi")
|
||||
|
||||
teleop__arm_config = SO100LeaderConfig(
|
||||
port="/dev/tty.usbmodem58760431551",
|
||||
id="my_awesome_leader_arm",
|
||||
)
|
||||
|
||||
teleop_keyboard_config = KeyboardTeleopConfig(
|
||||
id="my_laptop_keyboard",
|
||||
)
|
||||
teleop_arm_config = SO100LeaderConfig(port="/dev/tty.usbmodem585A0077581", id="my_awesome_leader_arm")
|
||||
keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard")
|
||||
|
||||
robot = LeKiwiClient(robot_config)
|
||||
teleop_arm = SO100Leader(teleop__arm_config)
|
||||
telep_keyboard = KeyboardTeleop(teleop_keyboard_config)
|
||||
leader_arm = SO100Leader(teleop_arm_config)
|
||||
keyboard = KeyboardTeleop(keyboard_config)
|
||||
|
||||
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
|
||||
robot.connect()
|
||||
teleop_arm.connect()
|
||||
telep_keyboard.connect()
|
||||
leader_arm.connect()
|
||||
keyboard.connect()
|
||||
|
||||
_init_rerun(session_name="lekiwi_teleop")
|
||||
|
||||
if not robot.is_connected or not leader_arm.is_connected or not keyboard.is_connected:
|
||||
raise ValueError("Robot, leader arm of keyboard is not connected!")
|
||||
|
||||
while True:
|
||||
t0 = time.perf_counter()
|
||||
|
||||
observation = robot.get_observation()
|
||||
|
||||
arm_action = teleop_arm.get_action()
|
||||
arm_action = leader_arm.get_action()
|
||||
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
|
||||
|
||||
keyboard_keys = telep_keyboard.get_action()
|
||||
keyboard_keys = keyboard.get_action()
|
||||
base_action = robot._from_keyboard_to_base_action(keyboard_keys)
|
||||
|
||||
robot.send_action(arm_action | base_action)
|
||||
log_rerun_data(observation, {**arm_action, **base_action})
|
||||
|
||||
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
|
||||
robot.send_action(action)
|
||||
|
||||
busy_wait(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
|
||||
|
||||
@@ -68,7 +68,6 @@ dependencies = [
|
||||
"pyserial>=3.5",
|
||||
"pyzmq>=26.2.1",
|
||||
"rerun-sdk>=0.21.0",
|
||||
"scipy>=1.14.0",
|
||||
"termcolor>=2.4.0",
|
||||
"torch>=2.2.1",
|
||||
"torchcodec>=0.2.1; sys_platform != 'win32' and (sys_platform != 'linux' or (platform_machine != 'aarch64' and platform_machine != 'arm64' and platform_machine != 'armv7l')) and (sys_platform != 'darwin' or platform_machine != 'x86_64')",
|
||||
@@ -87,6 +86,7 @@ dora = [
|
||||
dynamixel = ["dynamixel-sdk>=3.7.31"]
|
||||
feetech = ["feetech-servo-sdk>=1.0.0"]
|
||||
gamepad = ["pygame>=2.5.1", "hidapi>=0.14.0"]
|
||||
kinematics = ["placo>=0.9.6"]
|
||||
intelrealsense = [
|
||||
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'",
|
||||
"pyrealsense2-macosx>=2.54 ; sys_platform == 'darwin'",
|
||||
@@ -100,13 +100,16 @@ stretch = [
|
||||
"pyrealsense2>=2.55.1.6486 ; sys_platform != 'darwin'"
|
||||
]
|
||||
test = ["pytest>=8.1.0", "pytest-timeout>=2.4.0", "pytest-cov>=5.0.0", "pyserial>=3.5", "mock-serial>=0.0.1 ; sys_platform != 'win32'"]
|
||||
hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.8", "protobuf>=5.29.3", "grpcio==1.71.0"]
|
||||
hilserl = ["transformers>=4.50.3", "gym-hil>=0.1.9", "protobuf>=5.29.3", "grpcio==1.71.0", "placo>=0.9.6"]
|
||||
umi = ["imagecodecs>=2024.1.1"]
|
||||
video_benchmark = ["scikit-image>=0.23.2", "pandas>=2.2.2"]
|
||||
xarm = ["gym-xarm>=0.1.1 ; python_version < '4.0'"]
|
||||
|
||||
[tool.poetry]
|
||||
requires-poetry = ">=2.1"
|
||||
packages = [
|
||||
{ include = "lerobot", from = "src" }
|
||||
]
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 110
|
||||
|
||||
@@ -12,472 +12,117 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from scipy.spatial.transform import Rotation
|
||||
|
||||
|
||||
def skew_symmetric(w: NDArray[np.float32]) -> NDArray[np.float32]:
|
||||
"""Creates the skew-symmetric matrix from a 3D vector."""
|
||||
return np.array([[0, -w[2], w[1]], [w[2], 0, -w[0]], [-w[1], w[0], 0]])
|
||||
|
||||
|
||||
def rodrigues_rotation(w: NDArray[np.float32], theta: float) -> NDArray[np.float32]:
|
||||
"""Computes the rotation matrix using Rodrigues' formula."""
|
||||
w_hat = skew_symmetric(w)
|
||||
return np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat
|
||||
|
||||
|
||||
def screw_axis_to_transform(s: NDArray[np.float32], theta: float) -> NDArray[np.float32]:
|
||||
"""Converts a screw axis to a 4x4 transformation matrix."""
|
||||
screw_axis_rot = s[:3]
|
||||
screw_axis_trans = s[3:]
|
||||
|
||||
# Pure translation
|
||||
if np.allclose(screw_axis_rot, 0) and np.linalg.norm(screw_axis_trans) == 1:
|
||||
transform = np.eye(4)
|
||||
transform[:3, 3] = screw_axis_trans * theta
|
||||
|
||||
# Rotation (and potentially translation)
|
||||
elif np.linalg.norm(screw_axis_rot) == 1:
|
||||
w_hat = skew_symmetric(screw_axis_rot)
|
||||
rot_mat = np.eye(3) + np.sin(theta) * w_hat + (1 - np.cos(theta)) * w_hat @ w_hat
|
||||
t = (
|
||||
np.eye(3) * theta + (1 - np.cos(theta)) * w_hat + (theta - np.sin(theta)) * w_hat @ w_hat
|
||||
) @ screw_axis_trans
|
||||
transform = np.eye(4)
|
||||
transform[:3, :3] = rot_mat
|
||||
transform[:3, 3] = t
|
||||
else:
|
||||
raise ValueError("Invalid screw axis parameters")
|
||||
return transform
|
||||
|
||||
|
||||
def pose_difference_se3(pose1: NDArray[np.float32], pose2: NDArray[np.float32]) -> NDArray[np.float32]:
|
||||
"""
|
||||
Calculates the SE(3) difference between two 4x4 homogeneous transformation matrices.
|
||||
SE(3) (Special Euclidean Group) represents rigid body transformations in 3D space,
|
||||
combining rotation (SO(3)) and translation.
|
||||
|
||||
Each 4x4 matrix has the following structure:
|
||||
[R11 R12 R13 tx]
|
||||
[R21 R22 R23 ty]
|
||||
[R31 R32 R33 tz]
|
||||
[ 0 0 0 1]
|
||||
|
||||
where R is the 3x3 rotation matrix and [tx,ty,tz] is the translation vector.
|
||||
|
||||
Args:
|
||||
pose1: A 4x4 numpy array representing the first pose.
|
||||
pose2: A 4x4 numpy array representing the second pose.
|
||||
|
||||
Returns:
|
||||
A 6D numpy array concatenating translation and rotation differences.
|
||||
First 3 elements are the translational difference (position).
|
||||
Last 3 elements are the rotational difference in axis-angle representation.
|
||||
"""
|
||||
rot1 = pose1[:3, :3]
|
||||
rot2 = pose2[:3, :3]
|
||||
|
||||
translation_diff = pose1[:3, 3] - pose2[:3, 3]
|
||||
|
||||
# Calculate rotational difference using scipy's Rotation library
|
||||
rot_diff = Rotation.from_matrix(rot1 @ rot2.T)
|
||||
rotation_diff = rot_diff.as_rotvec() # Axis-angle representation
|
||||
|
||||
return np.concatenate([translation_diff, rotation_diff])
|
||||
|
||||
|
||||
def se3_error(target_pose: NDArray[np.float32], current_pose: NDArray[np.float32]) -> NDArray[np.float32]:
|
||||
pos_error = target_pose[:3, 3] - current_pose[:3, 3]
|
||||
|
||||
rot_target = target_pose[:3, :3]
|
||||
rot_current = current_pose[:3, :3]
|
||||
rot_error_mat = rot_target @ rot_current.T
|
||||
rot_error = Rotation.from_matrix(rot_error_mat).as_rotvec()
|
||||
|
||||
return np.concatenate([pos_error, rot_error])
|
||||
|
||||
|
||||
class RobotKinematics:
|
||||
"""Robot kinematics class supporting multiple robot models."""
|
||||
"""Robot kinematics using placo library for forward and inverse kinematics."""
|
||||
|
||||
# Robot measurements dictionary
|
||||
ROBOT_MEASUREMENTS = {
|
||||
"koch": {
|
||||
"gripper": [0.239, -0.001, 0.024],
|
||||
"wrist": [0.209, 0, 0.024],
|
||||
"forearm": [0.108, 0, 0.02],
|
||||
"humerus": [0, 0, 0.036],
|
||||
"shoulder": [0, 0, 0],
|
||||
"base": [0, 0, 0.02],
|
||||
},
|
||||
"moss": {
|
||||
"gripper": [0.246, 0.013, 0.111],
|
||||
"wrist": [0.245, 0.002, 0.064],
|
||||
"forearm": [0.122, 0, 0.064],
|
||||
"humerus": [0.001, 0.001, 0.063],
|
||||
"shoulder": [0, 0, 0],
|
||||
"base": [0, 0, 0.02],
|
||||
},
|
||||
"so_old_calibration": {
|
||||
"gripper": [0.320, 0, 0.050],
|
||||
"wrist": [0.278, 0, 0.050],
|
||||
"forearm": [0.143, 0, 0.044],
|
||||
"humerus": [0.031, 0, 0.072],
|
||||
"shoulder": [0, 0, 0],
|
||||
"base": [0, 0, 0.02],
|
||||
},
|
||||
"so_new_calibration": {
|
||||
"gripper": [0.33, 0.0, 0.285],
|
||||
"wrist": [0.30, 0.0, 0.267],
|
||||
"forearm": [0.25, 0.0, 0.266],
|
||||
"humerus": [0.06, 0.0, 0.264],
|
||||
"shoulder": [0.0, 0.0, 0.238],
|
||||
"base": [0.0, 0.0, 0.12],
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, robot_type: str = "so100"):
|
||||
"""Initialize kinematics for the specified robot type.
|
||||
|
||||
Args:
|
||||
robot_type: String specifying the robot model ("koch", "so100", or "moss")
|
||||
"""
|
||||
if robot_type not in self.ROBOT_MEASUREMENTS:
|
||||
raise ValueError(
|
||||
f"Unknown robot type: {robot_type}. Available types: {list(self.ROBOT_MEASUREMENTS.keys())}"
|
||||
)
|
||||
|
||||
self.robot_type = robot_type
|
||||
self.measurements = self.ROBOT_MEASUREMENTS[robot_type]
|
||||
|
||||
# Initialize all transformation matrices and screw axes
|
||||
self._setup_transforms()
|
||||
|
||||
def _create_translation_matrix(
|
||||
self, x: float = 0.0, y: float = 0.0, z: float = 0.0
|
||||
) -> NDArray[np.float32]:
|
||||
"""Create a 4x4 translation matrix."""
|
||||
return np.array([[1, 0, 0, x], [0, 1, 0, y], [0, 0, 1, z], [0, 0, 0, 1]])
|
||||
|
||||
def _setup_transforms(self):
|
||||
"""Setup all transformation matrices and screw axes for the robot."""
|
||||
# Set up rotation matrices (constant across robot types)
|
||||
|
||||
# Gripper orientation
|
||||
self.gripper_X0 = np.array(
|
||||
[
|
||||
[1, 0, 0, 0],
|
||||
[0, 0, 1, 0],
|
||||
[0, -1, 0, 0],
|
||||
[0, 0, 0, 1],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Wrist orientation
|
||||
self.wrist_X0 = np.array(
|
||||
[
|
||||
[0, -1, 0, 0],
|
||||
[1, 0, 0, 0],
|
||||
[0, 0, 1, 0],
|
||||
[0, 0, 0, 1],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Base orientation
|
||||
self.base_X0 = np.array(
|
||||
[
|
||||
[0, 0, 1, 0],
|
||||
[1, 0, 0, 0],
|
||||
[0, 1, 0, 0],
|
||||
[0, 0, 0, 1],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Gripper
|
||||
# Screw axis of gripper frame wrt base frame
|
||||
self.S_BG = np.array(
|
||||
[
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
self.measurements["gripper"][2],
|
||||
-self.measurements["gripper"][1],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Gripper origin to centroid transform
|
||||
self.X_GoGc = self._create_translation_matrix(x=0.07)
|
||||
|
||||
# Gripper origin to tip transform
|
||||
self.X_GoGt = self._create_translation_matrix(x=0.12)
|
||||
|
||||
# 0-position gripper frame pose wrt base
|
||||
self.X_BoGo = self._create_translation_matrix(
|
||||
x=self.measurements["gripper"][0],
|
||||
y=self.measurements["gripper"][1],
|
||||
z=self.measurements["gripper"][2],
|
||||
)
|
||||
|
||||
# Wrist
|
||||
# Screw axis of wrist frame wrt base frame
|
||||
self.S_BR = np.array(
|
||||
[0, 1, 0, -self.measurements["wrist"][2], 0, self.measurements["wrist"][0]], dtype=np.float32
|
||||
)
|
||||
|
||||
# 0-position origin to centroid transform
|
||||
self.X_RoRc = self._create_translation_matrix(x=0.0035, y=-0.002)
|
||||
|
||||
# 0-position wrist frame pose wrt base
|
||||
self.X_BR = self._create_translation_matrix(
|
||||
x=self.measurements["wrist"][0],
|
||||
y=self.measurements["wrist"][1],
|
||||
z=self.measurements["wrist"][2],
|
||||
)
|
||||
|
||||
# Forearm
|
||||
# Screw axis of forearm frame wrt base frame
|
||||
self.S_BF = np.array(
|
||||
[
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
-self.measurements["forearm"][2],
|
||||
0,
|
||||
self.measurements["forearm"][0],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Forearm origin + centroid transform
|
||||
self.X_ForearmFc = self._create_translation_matrix(x=0.036)
|
||||
|
||||
# 0-position forearm frame pose wrt base
|
||||
self.X_BF = self._create_translation_matrix(
|
||||
x=self.measurements["forearm"][0],
|
||||
y=self.measurements["forearm"][1],
|
||||
z=self.measurements["forearm"][2],
|
||||
)
|
||||
|
||||
# Humerus
|
||||
# Screw axis of humerus frame wrt base frame
|
||||
self.S_BH = np.array(
|
||||
[
|
||||
0,
|
||||
-1,
|
||||
0,
|
||||
self.measurements["humerus"][2],
|
||||
0,
|
||||
-self.measurements["humerus"][0],
|
||||
],
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Humerus origin to centroid transform
|
||||
self.X_HoHc = self._create_translation_matrix(x=0.0475)
|
||||
|
||||
# 0-position humerus frame pose wrt base
|
||||
self.X_BH = self._create_translation_matrix(
|
||||
x=self.measurements["humerus"][0],
|
||||
y=self.measurements["humerus"][1],
|
||||
z=self.measurements["humerus"][2],
|
||||
)
|
||||
|
||||
# Shoulder
|
||||
# Screw axis of shoulder frame wrt Base frame
|
||||
self.S_BS = np.array([0, 0, -1, 0, 0, 0], dtype=np.float32)
|
||||
|
||||
# Shoulder origin to centroid transform
|
||||
self.X_SoSc = self._create_translation_matrix(x=-0.017, z=0.0235)
|
||||
|
||||
# 0-position shoulder frame pose wrt base
|
||||
self.X_BS = self._create_translation_matrix(
|
||||
x=self.measurements["shoulder"][0],
|
||||
y=self.measurements["shoulder"][1],
|
||||
z=self.measurements["shoulder"][2],
|
||||
)
|
||||
|
||||
# Base
|
||||
# Base origin to centroid transform
|
||||
self.X_BoBc = self._create_translation_matrix(y=0.015)
|
||||
|
||||
# World to base transform
|
||||
self.X_WoBo = self._create_translation_matrix(
|
||||
x=self.measurements["base"][0],
|
||||
y=self.measurements["base"][1],
|
||||
z=self.measurements["base"][2],
|
||||
)
|
||||
|
||||
# Pre-compute gripper post-multiplication matrix
|
||||
self._fk_gripper_post = self.X_GoGc @ self.X_BoGo @ self.gripper_X0
|
||||
|
||||
def forward_kinematics(
|
||||
def __init__(
|
||||
self,
|
||||
robot_pos_deg: NDArray[np.float32],
|
||||
frame: str = "gripper_tip",
|
||||
) -> NDArray[np.float32]:
|
||||
"""Generic forward kinematics.
|
||||
|
||||
Args:
|
||||
robot_pos_deg: Joint positions in degrees. Can be ``None`` when
|
||||
computing the *base* frame as it does not depend on joint
|
||||
angles.
|
||||
frame: Target frame. One of
|
||||
``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``.
|
||||
|
||||
Returns
|
||||
-------
|
||||
NDArray[np.float32]
|
||||
4×4 homogeneous transformation matrix of the requested frame
|
||||
expressed in the world coordinate system.
|
||||
urdf_path: str,
|
||||
target_frame_name: str = "gripper_frame_link",
|
||||
joint_names: list[str] = None,
|
||||
):
|
||||
"""
|
||||
frame = frame.lower()
|
||||
if frame not in {
|
||||
"base",
|
||||
"shoulder",
|
||||
"humerus",
|
||||
"forearm",
|
||||
"wrist",
|
||||
"gripper",
|
||||
"gripper_tip",
|
||||
}:
|
||||
raise ValueError(
|
||||
f"Unknown frame '{frame}'. Valid options are base, shoulder, humerus, forearm, wrist, gripper, gripper_tip."
|
||||
)
|
||||
|
||||
# Base frame does not rely on joint angles.
|
||||
if frame == "base":
|
||||
return self.X_WoBo @ self.X_BoBc @ self.base_X0
|
||||
|
||||
robot_pos_rad = robot_pos_deg / 180 * np.pi
|
||||
|
||||
# Extract joint angles (note the sign convention for shoulder lift).
|
||||
theta_shoulder_pan = robot_pos_rad[0]
|
||||
theta_shoulder_lift = -robot_pos_rad[1]
|
||||
theta_elbow_flex = robot_pos_rad[2]
|
||||
theta_wrist_flex = robot_pos_rad[3]
|
||||
theta_wrist_roll = robot_pos_rad[4]
|
||||
|
||||
# Start with the world-to-base transform; incrementally add successive links.
|
||||
transformation_matrix = self.X_WoBo @ screw_axis_to_transform(self.S_BS, theta_shoulder_pan)
|
||||
if frame == "shoulder":
|
||||
return transformation_matrix @ self.X_SoSc @ self.X_BS
|
||||
|
||||
transformation_matrix = transformation_matrix @ screw_axis_to_transform(
|
||||
self.S_BH, theta_shoulder_lift
|
||||
)
|
||||
if frame == "humerus":
|
||||
return transformation_matrix @ self.X_HoHc @ self.X_BH
|
||||
|
||||
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BF, theta_elbow_flex)
|
||||
if frame == "forearm":
|
||||
return transformation_matrix @ self.X_ForearmFc @ self.X_BF
|
||||
|
||||
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BR, theta_wrist_flex)
|
||||
if frame == "wrist":
|
||||
return transformation_matrix @ self.X_RoRc @ self.X_BR @ self.wrist_X0
|
||||
|
||||
transformation_matrix = transformation_matrix @ screw_axis_to_transform(self.S_BG, theta_wrist_roll)
|
||||
if frame == "gripper":
|
||||
return transformation_matrix @ self._fk_gripper_post
|
||||
else: # frame == "gripper_tip"
|
||||
return transformation_matrix @ self.X_GoGt @ self.X_BoGo @ self.gripper_X0
|
||||
|
||||
def compute_jacobian(
|
||||
self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip"
|
||||
) -> NDArray[np.float32]:
|
||||
"""Finite differences to compute the Jacobian.
|
||||
J(i, j) represents how the ith component of the end-effector's velocity changes wrt a small change
|
||||
in the jth joint's velocity.
|
||||
Initialize placo-based kinematics solver.
|
||||
|
||||
Args:
|
||||
robot_pos_deg: Current joint positions in degrees
|
||||
fk_func: Forward kinematics function to use (defaults to fk_gripper)
|
||||
urdf_path: Path to the robot URDF file
|
||||
target_frame_name: Name of the end-effector frame in the URDF
|
||||
joint_names: List of joint names to use for the kinematics solver
|
||||
"""
|
||||
try:
|
||||
import placo
|
||||
except ImportError as e:
|
||||
raise ImportError(
|
||||
"placo is required for RobotKinematics. "
|
||||
"Please install the optional dependencies of `kinematics` in the package."
|
||||
) from e
|
||||
|
||||
self.robot = placo.RobotWrapper(urdf_path)
|
||||
self.solver = placo.KinematicsSolver(self.robot)
|
||||
self.solver.mask_fbase(True) # Fix the base
|
||||
|
||||
self.target_frame_name = target_frame_name
|
||||
|
||||
# Set joint names
|
||||
self.joint_names = list(self.robot.joint_names()) if joint_names is None else joint_names
|
||||
|
||||
# Initialize frame task for IK
|
||||
self.tip_frame = self.solver.add_frame_task(self.target_frame_name, np.eye(4))
|
||||
|
||||
def forward_kinematics(self, joint_pos_deg):
|
||||
"""
|
||||
Compute forward kinematics for given joint configuration given the target frame name in the constructor.
|
||||
|
||||
Args:
|
||||
joint_pos_deg: Joint positions in degrees (numpy array)
|
||||
|
||||
Returns:
|
||||
4x4 transformation matrix of the end-effector pose
|
||||
"""
|
||||
|
||||
eps = 1e-8
|
||||
jac = np.zeros(shape=(6, 5))
|
||||
delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64)
|
||||
for el_ix in range(len(robot_pos_deg[:-1])):
|
||||
delta *= 0
|
||||
delta[el_ix] = eps / 2
|
||||
sdot = (
|
||||
pose_difference_se3(
|
||||
self.forward_kinematics(robot_pos_deg[:-1] + delta, frame),
|
||||
self.forward_kinematics(robot_pos_deg[:-1] - delta, frame),
|
||||
)
|
||||
/ eps
|
||||
)
|
||||
jac[:, el_ix] = sdot
|
||||
return jac
|
||||
# Convert degrees to radians
|
||||
joint_pos_rad = np.deg2rad(joint_pos_deg[: len(self.joint_names)])
|
||||
|
||||
def compute_positional_jacobian(
|
||||
self, robot_pos_deg: NDArray[np.float32], frame: str = "gripper_tip"
|
||||
) -> NDArray[np.float32]:
|
||||
"""Finite differences to compute the positional Jacobian.
|
||||
J(i, j) represents how the ith component of the end-effector's position changes wrt a small change
|
||||
in the jth joint's velocity.
|
||||
# Update joint positions in placo robot
|
||||
for i, joint_name in enumerate(self.joint_names):
|
||||
self.robot.set_joint(joint_name, joint_pos_rad[i])
|
||||
|
||||
Args:
|
||||
robot_pos_deg: Current joint positions in degrees
|
||||
fk_func: Forward kinematics function to use (defaults to fk_gripper)
|
||||
# Update kinematics
|
||||
self.robot.update_kinematics()
|
||||
|
||||
# Get the transformation matrix
|
||||
return self.robot.get_T_world_frame(self.target_frame_name)
|
||||
|
||||
def inverse_kinematics(
|
||||
self, current_joint_pos, desired_ee_pose, position_weight=1.0, orientation_weight=0.01
|
||||
):
|
||||
"""
|
||||
eps = 1e-8
|
||||
jac = np.zeros(shape=(3, 5))
|
||||
delta = np.zeros(len(robot_pos_deg[:-1]), dtype=np.float64)
|
||||
for el_ix in range(len(robot_pos_deg[:-1])):
|
||||
delta *= 0
|
||||
delta[el_ix] = eps / 2
|
||||
sdot = (
|
||||
self.forward_kinematics(robot_pos_deg[:-1] + delta, frame)[:3, 3]
|
||||
- self.forward_kinematics(robot_pos_deg[:-1] - delta, frame)[:3, 3]
|
||||
) / eps
|
||||
jac[:, el_ix] = sdot
|
||||
return jac
|
||||
|
||||
def ik(
|
||||
self,
|
||||
current_joint_pos: NDArray[np.float32],
|
||||
desired_ee_pose: NDArray[np.float32],
|
||||
position_only: bool = True,
|
||||
frame: str = "gripper_tip",
|
||||
max_iterations: int = 5,
|
||||
learning_rate: float = 1,
|
||||
) -> NDArray[np.float32]:
|
||||
"""Inverse kinematics using gradient descent.
|
||||
Compute inverse kinematics using placo solver.
|
||||
|
||||
Args:
|
||||
current_joint_state: Initial joint positions in degrees
|
||||
current_joint_pos: Current joint positions in degrees (used as initial guess)
|
||||
desired_ee_pose: Target end-effector pose as a 4x4 transformation matrix
|
||||
position_only: If True, only match end-effector position, not orientation
|
||||
frame: Target frame. One of
|
||||
``{"base", "shoulder", "humerus", "forearm", "wrist", "gripper", "gripper_tip"}``.
|
||||
max_iterations: Maximum number of iterations to run
|
||||
learning_rate: Learning rate for gradient descent
|
||||
position_weight: Weight for position constraint in IK
|
||||
orientation_weight: Weight for orientation constraint in IK, set to 0.0 to only constrain position
|
||||
|
||||
Returns:
|
||||
Joint positions in degrees that achieve the desired end-effector pose
|
||||
"""
|
||||
# Do gradient descent.
|
||||
current_joint_state = current_joint_pos.copy()
|
||||
for _ in range(max_iterations):
|
||||
current_ee_pose = self.forward_kinematics(current_joint_state, frame)
|
||||
if not position_only:
|
||||
error = se3_error(desired_ee_pose, current_ee_pose)
|
||||
jac = self.compute_jacobian(current_joint_state, frame)
|
||||
else:
|
||||
error = desired_ee_pose[:3, 3] - current_ee_pose[:3, 3]
|
||||
jac = self.compute_positional_jacobian(current_joint_state, frame)
|
||||
delta_angles = np.linalg.pinv(jac) @ error
|
||||
current_joint_state[:-1] += learning_rate * delta_angles
|
||||
|
||||
if np.linalg.norm(error) < 5e-3:
|
||||
return current_joint_state
|
||||
return current_joint_state
|
||||
# Convert current joint positions to radians for initial guess
|
||||
current_joint_rad = np.deg2rad(current_joint_pos[: len(self.joint_names)])
|
||||
|
||||
# Set current joint positions as initial guess
|
||||
for i, joint_name in enumerate(self.joint_names):
|
||||
self.robot.set_joint(joint_name, current_joint_rad[i])
|
||||
|
||||
# Update the target pose for the frame task
|
||||
self.tip_frame.T_world_frame = desired_ee_pose
|
||||
|
||||
# Configure the task based on position_only flag
|
||||
self.tip_frame.configure(self.target_frame_name, "soft", position_weight, orientation_weight)
|
||||
|
||||
# Solve IK
|
||||
self.solver.solve(True)
|
||||
self.robot.update_kinematics()
|
||||
|
||||
# Extract joint positions
|
||||
joint_pos_rad = []
|
||||
for joint_name in self.joint_names:
|
||||
joint = self.robot.get_joint(joint_name)
|
||||
joint_pos_rad.append(joint)
|
||||
|
||||
# Convert back to degrees
|
||||
joint_pos_deg = np.rad2deg(joint_pos_rad)
|
||||
|
||||
# Preserve gripper position if present in current_joint_pos
|
||||
if len(current_joint_pos) > len(self.joint_names):
|
||||
result = np.zeros_like(current_joint_pos)
|
||||
result[: len(self.joint_names)] = joint_pos_deg
|
||||
result[len(self.joint_names) :] = current_joint_pos[len(self.joint_names) :]
|
||||
return result
|
||||
else:
|
||||
return joint_pos_deg
|
||||
|
||||
@@ -40,9 +40,7 @@ import time
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
|
||||
import numpy as np
|
||||
import rerun as rr
|
||||
from typing import List
|
||||
|
||||
from lerobot.cameras import ( # noqa: F401
|
||||
CameraConfig, # noqa: F401
|
||||
@@ -72,6 +70,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
so100_leader,
|
||||
so101_leader,
|
||||
)
|
||||
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
|
||||
from lerobot.utils.control_utils import (
|
||||
init_keyboard_listener,
|
||||
is_headless,
|
||||
@@ -85,7 +84,7 @@ from lerobot.utils.utils import (
|
||||
init_logging,
|
||||
log_say,
|
||||
)
|
||||
from lerobot.utils.visualization_utils import _init_rerun
|
||||
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -165,7 +164,7 @@ def record_loop(
|
||||
events: dict,
|
||||
fps: int,
|
||||
dataset: LeRobotDataset | None = None,
|
||||
teleop: Teleoperator | None = None,
|
||||
teleop: Teleoperator | List[Teleoperator] | None = None,
|
||||
policy: PreTrainedPolicy | None = None,
|
||||
control_time_s: int | None = None,
|
||||
single_task: str | None = None,
|
||||
@@ -174,6 +173,23 @@ def record_loop(
|
||||
if dataset is not None and dataset.fps != fps:
|
||||
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
|
||||
|
||||
teleop_arm = teleop_keyboard = None
|
||||
if isinstance(teleop, list):
|
||||
teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None)
|
||||
teleop_arm = next(
|
||||
(
|
||||
t
|
||||
for t in teleop
|
||||
if isinstance(t, (so100_leader.SO100Leader, so101_leader.SO101Leader, koch_leader.KochLeader))
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"):
|
||||
raise ValueError(
|
||||
"For multi-teleop, the list must contain exactly one KeyboardTeleop and one arm teleoperator. Currently only supported for LeKiwi robot."
|
||||
)
|
||||
|
||||
# if policy is given it needs cleaning up
|
||||
if policy is not None:
|
||||
policy.reset()
|
||||
@@ -202,8 +218,17 @@ def record_loop(
|
||||
robot_type=robot.robot_type,
|
||||
)
|
||||
action = {key: action_values[i].item() for i, key in enumerate(robot.action_features)}
|
||||
elif policy is None and teleop is not None:
|
||||
elif policy is None and isinstance(teleop, Teleoperator):
|
||||
action = teleop.get_action()
|
||||
elif policy is None and isinstance(teleop, list):
|
||||
# TODO(pepijn, steven): clean the record loop for use of multiple robots (possibly with pipeline)
|
||||
arm_action = teleop_arm.get_action()
|
||||
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
|
||||
|
||||
keyboard_action = teleop_keyboard.get_action()
|
||||
base_action = robot._from_keyboard_to_base_action(keyboard_action)
|
||||
|
||||
action = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
|
||||
else:
|
||||
logging.info(
|
||||
"No policy or teleoperator provided, skipping action generation."
|
||||
@@ -222,14 +247,7 @@ def record_loop(
|
||||
dataset.add_frame(frame)
|
||||
|
||||
if display_data:
|
||||
for obs, val in observation.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"observation.{obs}", rr.Scalar(val))
|
||||
elif isinstance(val, np.ndarray):
|
||||
rr.log(f"observation.{obs}", rr.Image(val), static=True)
|
||||
for act, val in action.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"action.{act}", rr.Scalar(val))
|
||||
log_rerun_data(observation, action)
|
||||
|
||||
dt_s = time.perf_counter() - start_loop_t
|
||||
busy_wait(1 / fps - dt_s)
|
||||
|
||||
@@ -22,7 +22,6 @@ from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
import torch
|
||||
import zmq
|
||||
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
@@ -195,26 +194,23 @@ class LeKiwiClient(Robot):
|
||||
self, observation: Dict[str, Any]
|
||||
) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
|
||||
"""Extracts frames, and state from the parsed observation."""
|
||||
flat_state = {key: value for key, value in observation.items() if key in self._state_ft}
|
||||
|
||||
state_vec = np.array(
|
||||
[flat_state.get(k, 0.0) for k in self._state_order],
|
||||
dtype=np.float32,
|
||||
)
|
||||
flat_state = {key: observation.get(key, 0.0) for key in self._state_order}
|
||||
|
||||
state_vec = np.array([flat_state[key] for key in self._state_order], dtype=np.float32)
|
||||
|
||||
obs_dict: Dict[str, Any] = {**flat_state, "observation.state": state_vec}
|
||||
|
||||
# Decode images
|
||||
image_observation = {
|
||||
f"observation.images.{key}": value
|
||||
for key, value in observation.items()
|
||||
if key in self._cameras_ft
|
||||
}
|
||||
current_frames: Dict[str, np.ndarray] = {}
|
||||
for cam_name, image_b64 in image_observation.items():
|
||||
for cam_name, image_b64 in observation.items():
|
||||
if cam_name not in self._cameras_ft:
|
||||
continue
|
||||
frame = self._decode_image_from_b64(image_b64)
|
||||
if frame is not None:
|
||||
current_frames[cam_name] = frame
|
||||
|
||||
return current_frames, {"observation.state": state_vec}
|
||||
return current_frames, obs_dict
|
||||
|
||||
def _get_data(self) -> Tuple[Dict[str, np.ndarray], Dict[str, Any], Dict[str, Any]]:
|
||||
"""
|
||||
@@ -267,7 +263,7 @@ class LeKiwiClient(Robot):
|
||||
if frame is None:
|
||||
logging.warning("Frame is None")
|
||||
frame = np.zeros((640, 480, 3), dtype=np.uint8)
|
||||
obs_dict[cam_name] = torch.from_numpy(frame)
|
||||
obs_dict[cam_name] = frame
|
||||
|
||||
return obs_dict
|
||||
|
||||
@@ -327,7 +323,10 @@ class LeKiwiClient(Robot):
|
||||
|
||||
# TODO(Steven): Remove the np conversion when it is possible to record a non-numpy array value
|
||||
actions = np.array([action.get(k, 0.0) for k in self._state_order], dtype=np.float32)
|
||||
return {"action": actions}
|
||||
|
||||
action_sent = {key: actions[i] for i, key in enumerate(self._state_order)}
|
||||
action_sent["action"] = actions
|
||||
return action_sent
|
||||
|
||||
def disconnect(self):
|
||||
"""Cleans ZMQ comms"""
|
||||
|
||||
@@ -44,6 +44,14 @@ class SO100FollowerConfig(RobotConfig):
|
||||
class SO100FollowerEndEffectorConfig(SO100FollowerConfig):
|
||||
"""Configuration for the SO100FollowerEndEffector robot."""
|
||||
|
||||
# Path to URDF file for kinematics
|
||||
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
|
||||
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
|
||||
urdf_path: str | None = None
|
||||
|
||||
# End-effector frame name in the URDF
|
||||
target_frame_name: str = "gripper_frame_link"
|
||||
|
||||
# Default bounds for the end-effector position (in meters)
|
||||
end_effector_bounds: dict[str, list[float]] = field(
|
||||
default_factory=lambda: {
|
||||
|
||||
@@ -30,7 +30,6 @@ from . import SO100Follower
|
||||
from .config_so100_follower import SO100FollowerEndEffectorConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
EE_FRAME = "gripper_tip"
|
||||
|
||||
|
||||
class SO100FollowerEndEffector(SO100Follower):
|
||||
@@ -64,7 +63,16 @@ class SO100FollowerEndEffector(SO100Follower):
|
||||
self.config = config
|
||||
|
||||
# Initialize the kinematics module for the so100 robot
|
||||
self.kinematics = RobotKinematics(robot_type="so_new_calibration")
|
||||
if self.config.urdf_path is None:
|
||||
raise ValueError(
|
||||
"urdf_path must be provided in the configuration for end-effector control. "
|
||||
"Please set urdf_path in your SO100FollowerEndEffectorConfig."
|
||||
)
|
||||
|
||||
self.kinematics = RobotKinematics(
|
||||
urdf_path=self.config.urdf_path,
|
||||
target_frame_name=self.config.target_frame_name,
|
||||
)
|
||||
|
||||
# Store the bounds for end-effector position
|
||||
self.end_effector_bounds = self.config.end_effector_bounds
|
||||
@@ -126,7 +134,7 @@ class SO100FollowerEndEffector(SO100Follower):
|
||||
|
||||
# Calculate current end-effector position using forward kinematics
|
||||
if self.current_ee_pos is None:
|
||||
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos, frame=EE_FRAME)
|
||||
self.current_ee_pos = self.kinematics.forward_kinematics(self.current_joint_pos)
|
||||
|
||||
# Set desired end-effector position by adding delta
|
||||
desired_ee_pos = np.eye(4)
|
||||
@@ -142,11 +150,10 @@ class SO100FollowerEndEffector(SO100Follower):
|
||||
)
|
||||
|
||||
# Compute inverse kinematics to get joint positions
|
||||
target_joint_values_in_degrees = self.kinematics.ik(
|
||||
self.current_joint_pos, desired_ee_pos, position_only=True, frame=EE_FRAME
|
||||
target_joint_values_in_degrees = self.kinematics.inverse_kinematics(
|
||||
self.current_joint_pos, desired_ee_pos
|
||||
)
|
||||
|
||||
target_joint_values_in_degrees = np.clip(target_joint_values_in_degrees, -180.0, 180.0)
|
||||
# Create joint space action dictionary
|
||||
joint_action = {
|
||||
f"{key}.pos": target_joint_values_in_degrees[i] for i, key in enumerate(self.bus.motors.keys())
|
||||
|
||||
@@ -50,6 +50,7 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
make_teleoperator_from_config,
|
||||
so100_leader,
|
||||
)
|
||||
from lerobot.utils.robot_utils import busy_wait
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -76,12 +77,12 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
|
||||
# Note to be compatible with the rest of the codebase,
|
||||
# we are using the new calibration method for so101 and so100
|
||||
robot_type = "so_new_calibration"
|
||||
kinematics = RobotKinematics(robot_type=robot_type)
|
||||
kinematics = RobotKinematics(cfg.robot.urdf_path, cfg.robot.target_frame_name)
|
||||
|
||||
# Initialize min/max values
|
||||
observation = robot.get_observation()
|
||||
joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors])
|
||||
ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3]
|
||||
ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3]
|
||||
|
||||
max_pos = joint_positions.copy()
|
||||
min_pos = joint_positions.copy()
|
||||
@@ -94,7 +95,7 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
|
||||
|
||||
observation = robot.get_observation()
|
||||
joint_positions = np.array([observation[f"{key}.pos"] for key in robot.bus.motors])
|
||||
ee_pos = kinematics.forward_kinematics(joint_positions, frame="gripper_tip")[:3, 3]
|
||||
ee_pos = kinematics.forward_kinematics(joint_positions)[:3, 3]
|
||||
|
||||
# Skip initial warmup period
|
||||
if (time.perf_counter() - start_episode_t) < 5:
|
||||
@@ -113,6 +114,8 @@ def find_joint_and_ee_bounds(cfg: FindJointLimitsConfig):
|
||||
print(f"Min joint pos position {np.round(min_pos, 4).tolist()}")
|
||||
break
|
||||
|
||||
busy_wait(0.01)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
find_joint_and_ee_bounds()
|
||||
|
||||
@@ -21,8 +21,7 @@ from pathlib import Path
|
||||
from typing import Dict, Tuple
|
||||
|
||||
import cv2
|
||||
|
||||
# import torch.nn.functional as F # noqa: N812
|
||||
import torch
|
||||
import torchvision.transforms.functional as F # type: ignore # noqa: N812
|
||||
from tqdm import tqdm # type: ignore
|
||||
|
||||
@@ -224,7 +223,8 @@ def convert_lerobot_dataset_to_cropper_lerobot_dataset(
|
||||
cropped = F.crop(value, top, left, height, width)
|
||||
value = F.resize(cropped, resize_size)
|
||||
value = value.clamp(0, 1)
|
||||
|
||||
if key.startswith("complementary_info") and isinstance(value, torch.Tensor) and value.dim() == 0:
|
||||
value = value.unsqueeze(0)
|
||||
new_frame[key] = value
|
||||
|
||||
new_frame["task"] = task
|
||||
@@ -266,8 +266,7 @@ if __name__ == "__main__":
|
||||
)
|
||||
parser.add_argument(
|
||||
"--push-to-hub",
|
||||
type=bool,
|
||||
default=False,
|
||||
action="store_true",
|
||||
help="Whether to push the new dataset to the hub.",
|
||||
)
|
||||
parser.add_argument(
|
||||
|
||||
@@ -254,20 +254,19 @@ class RobotEnv(gym.Env):
|
||||
self._joint_names = [f"{key}.pos" for key in self.robot.bus.motors]
|
||||
self._image_keys = self.robot.cameras.keys()
|
||||
|
||||
# Read initial joint positions using the bus
|
||||
self.current_joint_positions = self._get_observation()["agent_pos"]
|
||||
self.current_observation = None
|
||||
|
||||
self.use_gripper = use_gripper
|
||||
|
||||
self._setup_spaces()
|
||||
|
||||
def _get_observation(self) -> np.ndarray:
|
||||
def _get_observation(self) -> dict[str, np.ndarray]:
|
||||
"""Helper to convert a dictionary from bus.sync_read to an ordered numpy array."""
|
||||
obs_dict = self.robot.get_observation()
|
||||
joint_positions = np.array([obs_dict[name] for name in self._joint_names], dtype=np.float32)
|
||||
joint_positions = np.array([obs_dict[name] for name in self._joint_names])
|
||||
|
||||
images = {key: obs_dict[key] for key in self._image_keys}
|
||||
return {"agent_pos": joint_positions, "pixels": images}
|
||||
self.current_observation = {"agent_pos": joint_positions, "pixels": images}
|
||||
|
||||
def _setup_spaces(self):
|
||||
"""
|
||||
@@ -281,24 +280,24 @@ class RobotEnv(gym.Env):
|
||||
- The action space is defined as a Box space representing joint position commands. It is defined as relative (delta)
|
||||
or absolute, based on the configuration.
|
||||
"""
|
||||
example_obs = self._get_observation()
|
||||
self._get_observation()
|
||||
|
||||
observation_spaces = {}
|
||||
|
||||
# Define observation spaces for images and other states.
|
||||
if "pixels" in example_obs:
|
||||
prefix = "observation.images" if len(example_obs["pixels"]) > 1 else "observation.image"
|
||||
if "pixels" in self.current_observation:
|
||||
prefix = "observation.images"
|
||||
observation_spaces = {
|
||||
f"{prefix}.{key}": gym.spaces.Box(
|
||||
low=0, high=255, shape=example_obs["pixels"][key].shape, dtype=np.uint8
|
||||
low=0, high=255, shape=self.current_observation["pixels"][key].shape, dtype=np.uint8
|
||||
)
|
||||
for key in example_obs["pixels"]
|
||||
for key in self.current_observation["pixels"]
|
||||
}
|
||||
|
||||
observation_spaces["observation.state"] = gym.spaces.Box(
|
||||
low=0,
|
||||
high=10,
|
||||
shape=example_obs["agent_pos"].shape,
|
||||
shape=self.current_observation["agent_pos"].shape,
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
@@ -340,14 +339,12 @@ class RobotEnv(gym.Env):
|
||||
|
||||
self.robot.reset()
|
||||
|
||||
# Capture the initial observation.
|
||||
observation = self._get_observation()
|
||||
|
||||
# Reset episode tracking variables.
|
||||
self.current_step = 0
|
||||
self.episode_data = None
|
||||
|
||||
return observation, {"is_intervention": False}
|
||||
self.current_observation = None
|
||||
self._get_observation()
|
||||
return self.current_observation, {"is_intervention": False}
|
||||
|
||||
def step(self, action) -> tuple[dict[str, np.ndarray], float, bool, bool, dict[str, Any]]:
|
||||
"""
|
||||
@@ -367,8 +364,6 @@ class RobotEnv(gym.Env):
|
||||
- truncated (bool): True if the episode was truncated (e.g., time constraints).
|
||||
- info (dict): Additional debugging information including intervention status.
|
||||
"""
|
||||
self.current_joint_positions = self._get_observation()["agent_pos"]
|
||||
|
||||
action_dict = {"delta_x": action[0], "delta_y": action[1], "delta_z": action[2]}
|
||||
|
||||
# 1.0 action corresponds to no-op action
|
||||
@@ -376,6 +371,8 @@ class RobotEnv(gym.Env):
|
||||
|
||||
self.robot.send_action(action_dict)
|
||||
|
||||
self._get_observation()
|
||||
|
||||
if self.display_cameras:
|
||||
self.render()
|
||||
|
||||
@@ -386,7 +383,7 @@ class RobotEnv(gym.Env):
|
||||
truncated = False
|
||||
|
||||
return (
|
||||
self._get_observation(),
|
||||
self.current_observation,
|
||||
reward,
|
||||
terminated,
|
||||
truncated,
|
||||
@@ -399,11 +396,10 @@ class RobotEnv(gym.Env):
|
||||
"""
|
||||
import cv2
|
||||
|
||||
observation = self._get_observation()
|
||||
image_keys = [key for key in observation if "image" in key]
|
||||
image_keys = [key for key in self.current_observation if "image" in key]
|
||||
|
||||
for key in image_keys:
|
||||
cv2.imshow(key, cv2.cvtColor(observation[key].numpy(), cv2.COLOR_RGB2BGR))
|
||||
cv2.imshow(key, cv2.cvtColor(self.current_observation[key].numpy(), cv2.COLOR_RGB2BGR))
|
||||
cv2.waitKey(1)
|
||||
|
||||
def close(self):
|
||||
@@ -520,7 +516,10 @@ class AddCurrentToObservation(gym.ObservationWrapper):
|
||||
Returns:
|
||||
The modified observation with current values.
|
||||
"""
|
||||
present_current_observation = self.unwrapped._get_observation()["agent_pos"]
|
||||
present_current_dict = self.env.unwrapped.robot.bus.sync_read("Present_Current")
|
||||
present_current_observation = np.array(
|
||||
[present_current_dict[name] for name in self.env.unwrapped.robot.bus.motors]
|
||||
)
|
||||
observation["agent_pos"] = np.concatenate(
|
||||
[observation["agent_pos"], present_current_observation], axis=-1
|
||||
)
|
||||
@@ -1090,13 +1089,10 @@ class EEObservationWrapper(gym.ObservationWrapper):
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
# Initialize kinematics instance for the appropriate robot type
|
||||
robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101")
|
||||
if "so100" in robot_type or "so101" in robot_type:
|
||||
# Note to be compatible with the rest of the codebase,
|
||||
# we are using the new calibration method for so101 and so100
|
||||
robot_type = "so_new_calibration"
|
||||
self.kinematics = RobotKinematics(robot_type)
|
||||
self.kinematics = RobotKinematics(
|
||||
urdf_path=env.unwrapped.robot.config.urdf_path,
|
||||
target_frame_name=env.unwrapped.robot.config.target_frame_name,
|
||||
)
|
||||
|
||||
def observation(self, observation):
|
||||
"""
|
||||
@@ -1108,9 +1104,9 @@ class EEObservationWrapper(gym.ObservationWrapper):
|
||||
Returns:
|
||||
Enhanced observation with end-effector pose information.
|
||||
"""
|
||||
current_joint_pos = self.unwrapped._get_observation()["agent_pos"]
|
||||
current_joint_pos = self.unwrapped.current_observation["agent_pos"]
|
||||
|
||||
current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos, frame="gripper_tip")[:3, 3]
|
||||
current_ee_pos = self.kinematics.forward_kinematics(current_joint_pos)[:3, 3]
|
||||
observation["agent_pos"] = np.concatenate([observation["agent_pos"], current_ee_pos], -1)
|
||||
return observation
|
||||
|
||||
@@ -1157,12 +1153,10 @@ class BaseLeaderControlWrapper(gym.Wrapper):
|
||||
self.event_lock = Lock() # Thread-safe access to events
|
||||
|
||||
# Initialize robot control
|
||||
robot_type = getattr(env.unwrapped.robot.config, "robot_type", "so101")
|
||||
if "so100" in robot_type or "so101" in robot_type:
|
||||
# Note to be compatible with the rest of the codebase,
|
||||
# we are using the new calibration method for so101 and so100
|
||||
robot_type = "so_new_calibration"
|
||||
self.kinematics = RobotKinematics(robot_type)
|
||||
self.kinematics = RobotKinematics(
|
||||
urdf_path=env.unwrapped.robot.config.urdf_path,
|
||||
target_frame_name=env.unwrapped.robot.config.target_frame_name,
|
||||
)
|
||||
self.leader_torque_enabled = True
|
||||
self.prev_leader_gripper = None
|
||||
|
||||
@@ -1260,14 +1254,14 @@ class BaseLeaderControlWrapper(gym.Wrapper):
|
||||
leader_pos_dict = self.robot_leader.bus.sync_read("Present_Position")
|
||||
follower_pos_dict = self.robot_follower.bus.sync_read("Present_Position")
|
||||
|
||||
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict], dtype=np.float32)
|
||||
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict], dtype=np.float32)
|
||||
leader_pos = np.array([leader_pos_dict[name] for name in leader_pos_dict])
|
||||
follower_pos = np.array([follower_pos_dict[name] for name in follower_pos_dict])
|
||||
|
||||
self.leader_tracking_error_queue.append(np.linalg.norm(follower_pos[:-1] - leader_pos[:-1]))
|
||||
|
||||
# [:3, 3] Last column of the transformation matrix corresponds to the xyz translation
|
||||
leader_ee = self.kinematics.forward_kinematics(leader_pos, frame="gripper_tip")[:3, 3]
|
||||
follower_ee = self.kinematics.forward_kinematics(follower_pos, frame="gripper_tip")[:3, 3]
|
||||
leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3]
|
||||
follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3]
|
||||
|
||||
action = np.clip(leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes)
|
||||
# Normalize the action to the range [-1, 1]
|
||||
@@ -1341,6 +1335,9 @@ class BaseLeaderControlWrapper(gym.Wrapper):
|
||||
# NOTE:
|
||||
obs, reward, terminated, truncated, info = self.env.step(action)
|
||||
|
||||
if isinstance(action, np.ndarray):
|
||||
action = torch.from_numpy(action)
|
||||
|
||||
# Add intervention info
|
||||
info["is_intervention"] = is_intervention
|
||||
info["action_intervention"] = action
|
||||
@@ -1877,7 +1874,6 @@ def make_robot_env(cfg: EnvConfig) -> gym.Env:
|
||||
if cfg.robot is None:
|
||||
raise ValueError("RobotConfig (cfg.robot) must be provided for gym_manipulator environment.")
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
|
||||
teleop_device = make_teleoperator_from_config(cfg.teleop)
|
||||
teleop_device.connect()
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ from dataclasses import asdict, dataclass
|
||||
from pprint import pformat
|
||||
|
||||
import draccus
|
||||
import numpy as np
|
||||
import rerun as rr
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
|
||||
@@ -60,11 +59,12 @@ from lerobot.teleoperators import ( # noqa: F401
|
||||
)
|
||||
from lerobot.utils.robot_utils import busy_wait
|
||||
from lerobot.utils.utils import init_logging, move_cursor_up
|
||||
from lerobot.utils.visualization_utils import _init_rerun
|
||||
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
||||
|
||||
|
||||
@dataclass
|
||||
class TeleoperateConfig:
|
||||
# TODO: pepijn, steven: if more robots require multiple teleoperators (like lekiwi) its good to make this possibele in teleop.py and record.py with List[Teleoperator]
|
||||
teleop: TeleoperatorConfig
|
||||
robot: RobotConfig
|
||||
# Limit the maximum frames per second.
|
||||
@@ -84,14 +84,7 @@ def teleop_loop(
|
||||
action = teleop.get_action()
|
||||
if display_data:
|
||||
observation = robot.get_observation()
|
||||
for obs, val in observation.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"observation_{obs}", rr.Scalar(val))
|
||||
elif isinstance(val, np.ndarray):
|
||||
rr.log(f"observation_{obs}", rr.Image(val), static=True)
|
||||
for act, val in action.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"action_{act}", rr.Scalar(val))
|
||||
log_rerun_data(observation, action)
|
||||
|
||||
robot.send_action(action)
|
||||
dt_s = time.perf_counter() - loop_start
|
||||
|
||||
@@ -295,8 +295,8 @@ class GamepadController(InputController):
|
||||
try:
|
||||
# Read joystick axes
|
||||
# Left stick X and Y (typically axes 0 and 1)
|
||||
x_input = self.joystick.get_axis(0) # Left/Right
|
||||
y_input = self.joystick.get_axis(1) # Up/Down (often inverted)
|
||||
y_input = self.joystick.get_axis(0) # Left/Right
|
||||
x_input = self.joystick.get_axis(1) # Up/Down (often inverted)
|
||||
|
||||
# Right stick Y (typically axis 3 or 4)
|
||||
z_input = self.joystick.get_axis(3) # Up/Down for Z
|
||||
@@ -307,8 +307,8 @@ class GamepadController(InputController):
|
||||
z_input = 0 if abs(z_input) < self.deadzone else z_input
|
||||
|
||||
# Calculate deltas (note: may need to invert axes depending on controller)
|
||||
delta_x = -y_input * self.y_step_size # Forward/backward
|
||||
delta_y = -x_input * self.x_step_size # Left/right
|
||||
delta_x = -x_input * self.x_step_size # Forward/backward
|
||||
delta_y = -y_input * self.y_step_size # Left/right
|
||||
delta_z = -z_input * self.z_step_size # Up/down
|
||||
|
||||
return delta_x, delta_y, delta_z
|
||||
@@ -424,14 +424,14 @@ class GamepadControllerHID(InputController):
|
||||
# These offsets are for the Logitech RumblePad 2
|
||||
if data and len(data) >= 8:
|
||||
# Normalize joystick values from 0-255 to -1.0-1.0
|
||||
self.left_x = (data[1] - 128) / 128.0
|
||||
self.left_y = (data[2] - 128) / 128.0
|
||||
self.left_y = (data[1] - 128) / 128.0
|
||||
self.left_x = (data[2] - 128) / 128.0
|
||||
self.right_x = (data[3] - 128) / 128.0
|
||||
self.right_y = (data[4] - 128) / 128.0
|
||||
|
||||
# Apply deadzone
|
||||
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
|
||||
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
|
||||
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
|
||||
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
|
||||
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
|
||||
|
||||
@@ -465,8 +465,8 @@ class GamepadControllerHID(InputController):
|
||||
def get_deltas(self):
|
||||
"""Get the current movement deltas from gamepad state."""
|
||||
# Calculate deltas - invert as needed based on controller orientation
|
||||
delta_x = -self.left_y * self.x_step_size # Forward/backward
|
||||
delta_y = -self.left_x * self.y_step_size # Left/right
|
||||
delta_x = -self.left_x * self.x_step_size # Forward/backward
|
||||
delta_y = -self.left_y * self.y_step_size # Left/right
|
||||
delta_z = -self.right_y * self.z_step_size # Up/down
|
||||
|
||||
return delta_x, delta_y, delta_z
|
||||
|
||||
@@ -196,17 +196,18 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
delta_x = 0.0
|
||||
delta_y = 0.0
|
||||
delta_z = 0.0
|
||||
gripper_action = 1.0
|
||||
|
||||
# Generate action based on current key states
|
||||
for key, val in self.current_pressed.items():
|
||||
if key == keyboard.Key.up:
|
||||
delta_x = int(val)
|
||||
elif key == keyboard.Key.down:
|
||||
delta_x = -int(val)
|
||||
elif key == keyboard.Key.left:
|
||||
delta_y = int(val)
|
||||
elif key == keyboard.Key.right:
|
||||
delta_y = -int(val)
|
||||
elif key == keyboard.Key.down:
|
||||
delta_y = int(val)
|
||||
elif key == keyboard.Key.left:
|
||||
delta_x = int(val)
|
||||
elif key == keyboard.Key.right:
|
||||
delta_x = -int(val)
|
||||
elif key == keyboard.Key.shift:
|
||||
delta_z = -int(val)
|
||||
elif key == keyboard.Key.shift_r:
|
||||
@@ -230,7 +231,6 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
"delta_z": delta_z,
|
||||
}
|
||||
|
||||
gripper_action = 1 # default gripper action is to stay
|
||||
if self.config.use_gripper:
|
||||
action_dict["gripper"] = gripper_action
|
||||
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import rerun as rr
|
||||
|
||||
|
||||
@@ -24,3 +26,21 @@ def _init_rerun(session_name: str = "lerobot_control_loop") -> None:
|
||||
rr.init(session_name)
|
||||
memory_limit = os.getenv("LEROBOT_RERUN_MEMORY_LIMIT", "10%")
|
||||
rr.spawn(memory_limit=memory_limit)
|
||||
|
||||
|
||||
def log_rerun_data(observation: dict[str | Any], action: dict[str | Any]):
|
||||
for obs, val in observation.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"observation.{obs}", rr.Scalar(val))
|
||||
elif isinstance(val, np.ndarray):
|
||||
if val.ndim == 1:
|
||||
for i, v in enumerate(val):
|
||||
rr.log(f"observation.{obs}_{i}", rr.Scalar(float(v)))
|
||||
else:
|
||||
rr.log(f"observation.{obs}", rr.Image(val), static=True)
|
||||
for act, val in action.items():
|
||||
if isinstance(val, float):
|
||||
rr.log(f"action.{act}", rr.Scalar(val))
|
||||
elif isinstance(val, np.ndarray):
|
||||
for i, v in enumerate(val):
|
||||
rr.log(f"action.{act}_{i}", rr.Scalar(float(v)))
|
||||
|
||||
Reference in New Issue
Block a user