Files
lerobot-clone/src/lerobot/scripts/lerobot_record.py

611 lines
24 KiB
Python

# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Records a dataset. Actions for the robot can be either generated by teleoperation or by a policy.
Example:
```shell
lerobot-record \
--robot.type=so100_follower \
--robot.port=/dev/tty.usbmodem58760431541 \
--robot.cameras="{laptop: {type: opencv, index_or_path: 0, width: 640, height: 480, fps: 30}}" \
--robot.id=black \
--dataset.repo_id=<my_username>/<my_dataset_name> \
--dataset.num_episodes=2 \
--dataset.single_task="Grab the cube" \
--display_data=true
# <- Optional: specify video codec (h264, hevc, libsvtav1). Default is libsvtav1. \
# --dataset.vcodec=h264 \
# <- Teleop optional if you want to teleoperate to record or in between episodes with a policy \
# --teleop.type=so100_leader \
# --teleop.port=/dev/tty.usbmodem58760431551 \
# --teleop.id=blue \
# <- Policy optional if you want to record with a policy \
# --policy.path=${HF_USER}/my_policy \
```
Example recording with bimanual so100:
```shell
lerobot-record \
--robot.type=bi_so_follower \
--robot.left_arm_config.port=/dev/tty.usbmodem5A460822851 \
--robot.right_arm_config.port=/dev/tty.usbmodem5A460814411 \
--robot.id=bimanual_follower \
--robot.left_arm_config.cameras='{
wrist: {"type": "opencv", "index_or_path": 1, "width": 640, "height": 480, "fps": 30},
top: {"type": "opencv", "index_or_path": 3, "width": 640, "height": 480, "fps": 30},
}' --robot.right_arm_config.cameras='{
wrist: {"type": "opencv", "index_or_path": 2, "width": 640, "height": 480, "fps": 30},
front: {"type": "opencv", "index_or_path": 4, "width": 640, "height": 480, "fps": 30},
}' \
--teleop.type=bi_so_leader \
--teleop.left_arm_config.port=/dev/tty.usbmodem5A460852721 \
--teleop.right_arm_config.port=/dev/tty.usbmodem5A460819811 \
--teleop.id=bimanual_leader \
--display_data=true \
--dataset.repo_id=${HF_USER}/bimanual-so-handover-cube \
--dataset.num_episodes=25 \
--dataset.single_task="Grab and handover the red cube to the other arm"
```
"""
import logging
import time
from copy import copy
from dataclasses import asdict, dataclass, field
from pathlib import Path
from pprint import pformat
from typing import Any
import numpy as np
from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.reachy2_camera.configuration_reachy2_camera import Reachy2CameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.cameras.zmq.configuration_zmq import ZMQCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import DEFAULT_AUDIO_CHUNK_DURATION, build_dataset_frame, combine_feature_dicts
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.microphones.utils import (
async_microphones_start_recording,
async_microphones_stop_recording,
)
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.utils import make_robot_action
from lerobot.processor import (
PolicyAction,
PolicyProcessorPipeline,
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_processors,
)
from lerobot.processor.rename_processor import rename_stats
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_so_follower,
earthrover_mini_plus,
hope_jr,
koch_follower,
make_robot_from_config,
omx_follower,
reachy2,
so_follower,
unitree_g1,
)
from lerobot.teleoperators import ( # noqa: F401
Teleoperator,
TeleoperatorConfig,
bi_so_leader,
homunculus,
koch_leader,
make_teleoperator_from_config,
omx_leader,
reachy2_teleoperator,
so_leader,
)
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
from lerobot.utils.constants import ACTION, OBS_STR
from lerobot.utils.control_utils import (
init_keyboard_listener,
is_headless,
predict_action,
sanity_check_dataset_name,
sanity_check_dataset_robot_compatibility,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import (
get_safe_torch_device,
init_logging,
log_say,
)
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
@dataclass
class DatasetRecordConfig:
# Dataset identifier. By convention it should match '{hf_username}/{dataset_name}' (e.g. `lerobot/test`).
repo_id: str
# A short but accurate description of the task performed during the recording (e.g. "Pick the Lego block and drop it in the box on the right.")
single_task: str
# Root directory where the dataset will be stored (e.g. 'dataset/path').
root: str | Path | None = None
# Limit the frames per second.
fps: int = 30
# Number of seconds for data recording for each episode.
episode_time_s: int | float = 60
# Number of seconds for resetting the environment after each episode.
reset_time_s: int | float = 60
# Number of episodes to record.
num_episodes: int = 50
# Encode frames in the dataset into video
video: bool = True
# Upload dataset to Hugging Face hub.
push_to_hub: bool = True
# Upload on private repository on the Hugging Face hub.
private: bool = False
# Add tags to your dataset on the hub.
tags: list[str] | None = None
# Number of subprocesses handling the saving of frames as PNG. Set to 0 to use threads only;
# set to ≥1 to use subprocesses, each using threads to write images. The best number of processes
# and threads depends on your system. We recommend 4 threads per camera with 0 processes.
# If fps is unstable, adjust the thread count. If still unstable, try using 1 or more subprocesses.
num_image_writer_processes: int = 0
# Number of threads writing the frames as png images on disk, per camera.
# Too many threads might cause unstable teleoperation fps due to main thread being blocked.
# Not enough threads might cause low camera fps.
num_image_writer_threads_per_camera: int = 4
# Number of episodes to record before batch encoding videos
# Set to 1 for immediate encoding (default behavior), or higher for batched encoding
video_encoding_batch_size: int = 1
# Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1'.
# Use 'h264' for faster encoding on systems where AV1 encoding is CPU-heavy.
vcodec: str = "libsvtav1"
# Rename map for the observation to override the image and state keys
rename_map: dict[str, str] = field(default_factory=dict)
def __post_init__(self):
if self.single_task is None:
raise ValueError("You need to provide a task as argument in `single_task`.")
@dataclass
class RecordConfig:
robot: RobotConfig
dataset: DatasetRecordConfig
# Whether to control the robot with a teleoperator
teleop: TeleoperatorConfig | None = None
# Whether to control the robot with a policy
policy: PreTrainedConfig | None = None
# Display all cameras on screen
display_data: bool = False
# Display data on a remote Rerun server
display_ip: str | None = None
# Port of the remote Rerun server
display_port: int | None = None
# Whether to display compressed images in Rerun
display_compressed_images: bool = False
# Use vocal synthesis to read events.
play_sounds: bool = True
# Resume recording on an existing dataset.
resume: bool = False
def __post_init__(self):
# HACK: We parse again the cli args here to get the pretrained path if there was one.
policy_path = parser.get_path_arg("policy")
if policy_path:
cli_overrides = parser.get_cli_overrides("policy")
self.policy = PreTrainedConfig.from_pretrained(policy_path, cli_overrides=cli_overrides)
self.policy.pretrained_path = policy_path
if self.teleop is None and self.policy is None:
raise ValueError("Choose a policy, a teleoperator or both to control the robot")
@classmethod
def __get_path_fields__(cls) -> list[str]:
"""This enables the parser to load config from the policy using `--policy.path=local/dir`"""
return ["policy"]
""" --------------- record_loop() data flow --------------------------
[ Robot ]
V
[ robot.get_observation() ] ---> raw_obs
V
[ robot_observation_processor ] ---> processed_obs
V
.-----( ACTION LOGIC )------------------.
V V
[ From Teleoperator ] [ From Policy ]
| |
| [teleop.get_action] -> raw_action | [predict_action]
| | | |
| V | V
| [teleop_action_processor] | |
| | | |
'---> processed_teleop_action '---> processed_policy_action
| |
'-------------------------.-------------'
V
[ robot_action_processor ] --> robot_action_to_send
V
[ robot.send_action() ] -- (Robot Executes)
V
( Save to Dataset )
V
( Rerun Log / Loop Wait )
"""
@safe_stop_image_writer
def record_loop(
robot: Robot,
events: dict,
fps: int,
teleop_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs after teleop
robot_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs before robot
robot_observation_processor: RobotProcessorPipeline[
RobotObservation, RobotObservation
], # runs after robot
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | list[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
preprocessor: PolicyProcessorPipeline[dict[str, Any], dict[str, Any]] | None = None,
postprocessor: PolicyProcessorPipeline[PolicyAction, PolicyAction] | None = None,
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
display_compressed_images: bool = False,
):
if display_data:
init_rerun(
session_name="recording",
robot=robot,
reset_time=True,
)
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
teleop_arm = teleop_keyboard = None
if isinstance(teleop, list):
teleop_keyboard = next((t for t in teleop if isinstance(t, KeyboardTeleop)), None)
teleop_arm = next(
(
t
for t in teleop
if isinstance(
t,
(
so_leader.SO100Leader
| so_leader.SO101Leader
| koch_leader.KochLeader
| omx_leader.OmxLeader
),
)
),
None,
)
if not (teleop_arm and teleop_keyboard and len(teleop) == 2 and robot.name == "lekiwi_client"):
raise ValueError(
"For multi-teleop, the list must contain exactly one KeyboardTeleop and one arm teleoperator. Currently only supported for LeKiwi robot."
)
# Reset policy and processor if they are provided
if policy is not None and preprocessor is not None and postprocessor is not None:
policy.reset()
preprocessor.reset()
postprocessor.reset()
if dataset is not None and robot.name != "lekiwi":
for microphone_key, microphone in robot.microphones.items():
dataset.add_microphone_recording(microphone, microphone_key)
else:
async_microphones_start_recording(robot.microphones)
# Create a buffer for audio observations (shifting window of fixed size over audio samples)
audio_buffer = {
microphone_name: np.zeros(
(int(microphone.sample_rate * DEFAULT_AUDIO_CHUNK_DURATION), len(microphone.channels))
)
for microphone_name, microphone in robot.microphones.items()
}
timestamp = 0
start_episode_t = time.perf_counter()
while timestamp < control_time_s:
start_loop_t = time.perf_counter()
if events["exit_early"]:
events["exit_early"] = False
break
# Get robot observation
obs = robot.get_observation()
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_processed = robot_observation_processor(obs)
if policy is not None or dataset is not None:
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
# Transform instantaneous audio samples into a buffer of fixed size
buffered_observation_frame = copy(observation_frame)
for name in audio_buffer:
# Remove as many old audio samples as needed
audio_buffer[name] = audio_buffer[name][len(buffered_observation_frame[name]) :]
# Add new audio samples
audio_buffer[name] = np.vstack((audio_buffer[name], buffered_observation_frame[name]))
# Add the audio buffer to the observation
buffered_observation_frame[name] = audio_buffer[name]
action_values = predict_action(
observation=buffered_observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
elif policy is None and isinstance(teleop, Teleoperator):
act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
act_processed_teleop = teleop_action_processor((act, obs))
elif policy is None and isinstance(teleop, list):
arm_action = teleop_arm.get_action()
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
act_processed_teleop = teleop_action_processor((act, obs))
else:
logging.info(
"No policy or teleoperator provided, skipping action generation."
"This is likely to happen when resetting the environment without a teleop device."
"The robot won't be at its rest position at the start of the next episode."
)
continue
# Applies a pipeline to the action, default is IdentityProcessor
if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
else:
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
# Send action to robot
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
_sent_action = robot.send_action(robot_action_to_send)
# Write to dataset
if dataset is not None:
action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
frame = {**observation_frame, **action_frame, "task": single_task}
dataset.add_frame(frame)
if display_data:
log_rerun_data(
observation=obs_processed, action=action_values, compress_images=display_compressed_images
)
dt_s = time.perf_counter() - start_loop_t
precise_sleep(max(1 / fps - dt_s, 0.0))
timestamp = time.perf_counter() - start_episode_t
async_microphones_stop_recording(robot.microphones)
@parser.wrap()
def record(cfg: RecordConfig) -> LeRobotDataset:
init_logging()
logging.info(pformat(asdict(cfg)))
if cfg.display_data:
init_rerun(session_name="recording", ip=cfg.display_ip, port=cfg.display_port)
display_compressed_images = (
True
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
else cfg.display_compressed_images
)
robot = make_robot_from_config(cfg.robot)
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(
action=robot.action_features
), # TODO(steven, pepijn): in future this should be come from teleop or policy
use_videos=cfg.dataset.video,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=cfg.dataset.video,
),
)
dataset = None
listener = None
try:
if cfg.resume:
dataset = LeRobotDataset(
cfg.dataset.repo_id,
root=cfg.dataset.root,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
)
if hasattr(robot, "cameras") and len(robot.cameras) > 0:
dataset.start_image_writer(
num_processes=cfg.dataset.num_image_writer_processes,
num_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras),
)
sanity_check_dataset_robot_compatibility(dataset, robot, cfg.dataset.fps, dataset_features)
else:
# Create empty dataset or load existing saved episodes
sanity_check_dataset_name(cfg.dataset.repo_id, cfg.policy)
dataset = LeRobotDataset.create(
cfg.dataset.repo_id,
cfg.dataset.fps,
root=cfg.dataset.root,
robot_type=robot.name,
features=dataset_features,
use_videos=cfg.dataset.video,
image_writer_processes=cfg.dataset.num_image_writer_processes,
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras),
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
)
# Load pretrained policy
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
preprocessor = None
postprocessor = None
if cfg.policy is not None:
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=cfg.policy.pretrained_path,
dataset_stats=rename_stats(dataset.meta.stats, cfg.dataset.rename_map),
preprocessor_overrides={
"device_processor": {"device": cfg.policy.device},
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map},
},
)
robot.connect()
if teleop is not None:
teleop.connect()
listener, events = init_keyboard_listener()
with VideoEncodingManager(dataset):
recorded_episodes = 0
while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]:
log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds)
record_loop(
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
policy=policy,
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=cfg.dataset.episode_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
display_compressed_images=display_compressed_images,
)
# Execute a few seconds without recording to give time to manually reset the environment
# Skip reset for the last episode to be recorded
if not events["stop_recording"] and (
(recorded_episodes < cfg.dataset.num_episodes - 1) or events["rerecord_episode"]
):
log_say("Reset the environment", cfg.play_sounds)
# reset g1 robot
if robot.name == "unitree_g1":
robot.reset()
record_loop(
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
control_time_s=cfg.dataset.reset_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
)
if events["rerecord_episode"]:
log_say("Re-record episode", cfg.play_sounds)
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
dataset.save_episode()
recorded_episodes += 1
finally:
log_say("Stop recording", cfg.play_sounds, blocking=True)
if dataset:
dataset.finalize()
if robot.is_connected:
robot.disconnect()
if teleop and teleop.is_connected:
teleop.disconnect()
if not is_headless() and listener:
listener.stop()
if cfg.dataset.push_to_hub:
dataset.push_to_hub(tags=cfg.dataset.tags, private=cfg.dataset.private)
log_say("Exiting", cfg.play_sounds)
return dataset
def main():
register_third_party_plugins()
record()
if __name__ == "__main__":
main()