Compare commits

...

32 Commits

Author SHA1 Message Date
Pepijn
bf03414b38 also have pipeline for feedback_features and action_features 2026-03-02 22:07:13 +01:00
Pepijn
9860f794cf also refactor and remove use of aggregate_pipeline_dataset_features() as we already aggregate expected features on robot and teleop classes 2026-03-02 21:05:58 +01:00
Pepijn
7940bfad52 make lerobot record easier 2026-03-02 20:17:50 +01:00
Pepijn
a2246a650b pipeline architecture changes 2026-03-02 13:09:35 +01:00
Steven Palma
095856b06a chore: add AI policy (#3055) 2026-02-28 14:41:28 +01:00
Steven Palma
563f42bdb1 chore(dependencies): Bump lerobot to 0.4.5 (#3051) 2026-02-27 19:29:35 +01:00
Caroline Pascal
8fff0fde7c chore(docstrings): fixing deprecated root argument description in LeRobotDataset class (#3035)
* chore(docstrings): fixing deprecated `root` argument docstrings in LeRobotDataset class

* chore(draccus): updating draccus CLI help

* chore(revert): reverting changes in lerobot_dataset_viz.py

---------

Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-27 18:22:44 +01:00
Pepijn
04de496547 fix(logging): avoid double-counting samples across processes (#3045) 2026-02-27 17:45:19 +01:00
Khalil Meftah
baf9b50365 Fix(diffusion): enforce no-crop behavior when crop_ratio=1.0 (#3046)
* refactor(diffusion): replace crop_shape with resize_shape and crop_ratio

* fix(diffusion): address review feedback on resize/crop backward compat

* test: regenerate diffusion artifacts for updated default config

* fix: disable crop when resize path uses crop_ratio=1.0

---------

Co-authored-by: starlitxiling <1754165401@qq.com>
2026-02-27 17:44:53 +01:00
Jade Choghari
a0fdbf037a feat(policies): add Smolvla torch compile support (#3043)
* Change LIBERO init_state_id when reset.

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* Change LIBERO init_state_id when reset.

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* pre-commit run

* Add torch.compile for smolvla

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* Add torch.compile for smolvla

Add model compilation option for improved performance.

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>

* first

---------

Signed-off-by: Aoqun Jin <aojiaojiao@foxmail.com>
Co-authored-by: Aoqun Jin <aojiaojiao@foxmail.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-27 18:58:36 +03:00
Khalil Meftah
c085531b17 fix: add missing openarm_mini import to CLI scripts (#3028) 2026-02-27 15:46:31 +01:00
Steven Palma
c7c6205332 chore(scripts): no spam log when no action (#3042) 2026-02-27 15:26:56 +01:00
Michio Sun
4e54be1334 fix(datasets): skip warning when MultiLeRobotDataset features are identical (#3019)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-26 17:42:22 +01:00
Damien LaRocque
fde9d08281 feat(async_inference) Enable plugins with async inference (#2425)
* feat(async-inference) Try using async inference server with plugins

* Fix import

* Fix import error in Robot Client

---------

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-26 14:41:32 +01:00
Khalil Meftah
46044fed75 Fix: remove device_map from SmolVLA model loading (#3029)
* Fix SmolVLA meta tensor error by removing device_map

- Remove device_map parameter from VLM model loading
- Change torch_dtype from string to torch.bfloat16
- Add explicit .to(device) calls after initialization

This resolves NotImplementedError when training SmolVLA policy.
Fixes meta tensor copy issue in factory.py:418.

* fix: remove manual device movement logic and fix dtype handling

---------

Co-authored-by: Highsky7 <albert31115@gmail.com>
2026-02-26 13:28:46 +01:00
Khalil Meftah
975dcad918 Feat(teleoperators): add OpenArm Mini teleoperator (#3022)
* add OpenArm Mini config and module init

* add OpenArm Mini teleoperator implementation

* add OpenArm Mini into factory and setup motors

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-02-25 18:46:55 +01:00
Cotton Hu
d0b58190da fix(policies): support dp train when n_obs_steps=1 (#2430)
Co-authored-by: hukongtao <hukongtao@agibot.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-25 17:36:31 +01:00
Mishig
9a5ab8ffab feat: add visualization badge to card template and update dataset card creation with repo_id (#3005)
* feat: add visualization badge to card template and update dataset card creation with repo_id

* Update src/lerobot/datasets/card_template.md

* Update src/lerobot/datasets/card_template.md

---------

Signed-off-by: Mishig <dmishig@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-25 16:02:40 +01:00
Khalil Meftah
7541d72130 Fix SARM dense_only mode: always load episodes_df for target computation (#3021)
* fix annotation mode check

* fix: SARM dense_only mode always load episodes_df for target computation

---------

Co-authored-by: John Newsom <jackmnewsom@gmail.com>
Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
2026-02-25 13:28:01 +01:00
Jash Shah
0317a15bf1 fix(video): replace assertions with proper exceptions in video frame decoding (#3016)
Replaced assert statements with FrameTimestampError exceptions in
decode_video_frames_torchvision and decode_video_frames_torchcodec.

Assertions are unsuitable for runtime validation because they can be
silently disabled with python -O, and they produce unhelpful
AssertionError tracebacks. The codebase already defines
FrameTimestampError for this exact purpose but it was only used
in one of the three validation sites.

Also removed AssertionError from the except clause in
LeRobotDataset.__init__, which was masking video timestamp errors
by silently triggering a dataset re-download instead of surfacing
the actual problem.
2026-02-25 12:29:22 +01:00
Jash Shah
f138e5948a Fix metaworld_config.json not bundled in pip installs and AttributeError crash (#3017)
1. Include metaworld_config.json in package distributions by adding it to
   both MANIFEST.in (for sdist) and pyproject.toml package-data (for wheels).
   Without this, pip-installed lerobot raises FileNotFoundError when
   importing the metaworld environment.

2. Fix crash in sanity_check_dataset_name where the error message accesses
   policy_cfg.type when policy_cfg is None, raising AttributeError instead
   of the intended ValueError.

Fixes #2958
2026-02-25 12:29:10 +01:00
Martin Kiefel
8fef4ddab8 fix(dataset): Fix reindexing bug for videos on splits (#2548)
* fix(dataset): Reindex videos based on frame and not on time

Sometimes during split operations the frame timestamp floating
precision leads to frame ending up in the wrong split.

This changes fixes the issues by directly working with frame indices
instead.

* Fix formatting
2026-02-25 11:57:07 +01:00
Steven Palma
18d9cb5ac4 feat(scripts): Integrate tqdm for training progress visualization (#3010) 2026-02-24 19:10:43 +01:00
Steven Palma
5095ab0845 fix(ci): permissions triton (#3011) 2026-02-24 19:09:34 +01:00
Jash Shah
dac1efd13d feat: Enable torch.compile for DiffusionPolicy inference (#2486)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-24 17:29:08 +01:00
Dominik Paľo
7fd71c83a3 docs: add WSL evdev installation note (#2855)
Add a note in the installation guide explaining that users on WSL need to install evdev to avoid build issues.
See: https://github.com/huggingface/lerobot/issues/2528

Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 20:41:20 +01:00
Yuan Haokuan
0f44adbeec docs: fix HF_USER export command to correctly parse username (#2932)
* Fix HF_USER extraction command in documentation

Updated command to extract the username from hf auth output.

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

* Correct HF_USER variable assignment in documentation

Fix the variable extraction from hf auth output.

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

* Update docs/source/il_robots.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>

---------

Signed-off-by: Yuan Haokuan <138340416+WilbertYuan@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:51:13 +01:00
Guilherme Miotto
7dbbaa3727 Small comment fix (#2990)
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:11:55 +01:00
Yuta Nakagawa
fcabfd32a5 chore(docs): update the document for Phone teleop to clarify how to use the examples (#2991)
* update the document for Phone teleope to clarify how to use the examples

* Update docs/source/phone_teleop.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yuta Nakagawa <ytnkgw@gmail.com>

---------

Signed-off-by: Yuta Nakagawa <ytnkgw@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 17:11:46 +01:00
Steven Palma
544cbc5f38 feat(motors): add RobStride CAN implementation (#2821)
* feat(motors): add initial implementation of robstride

Co-authored-by: Virgile <virgilebatto@gmail.com>

* chore(motors): solve some linter

* remove kp/kd attribute

* code uniformisation between damiao and robstride

* remove normalization warning

* remove non valid baudrates and small docstring update

* remove all useless files. Only keeping robstride.py and table.py

* typing for mypy

* reduce NameOrId usage

* align signature with damiao

* put the same helper than in the damiao implementation

* bug correction : expect a response after each bus.send

---------

Co-authored-by: Virgile <virgilebatto@gmail.com>
2026-02-23 16:39:04 +01:00
Yueci Deng
a0c5d19391 add metadata_buffer_size to dataset creation (#2998)
Signed-off-by: Steven Palma <imstevenpmwork@ieee.org>
Co-authored-by: Steven Palma <imstevenpmwork@ieee.org>
2026-02-23 16:32:59 +01:00
Steven Palma
e96339a3b4 feat(dataset): add streaming video encoding + HW encoder support (#2974)
* feat(dataset): init stream encoding

* feat(dataset): use threads to fix frame pickle latency

* refactor(dataset): remove HW encoded related changes

* add lp (#2977)

* feat(dataset): add Hw encoding + log drop frames (#2978)

* chore(docs): add streaming video encoding guide

* fix(dataset): style docs + testing

* chore(docs): simplify sttreaming video encoding guide

* chore(dataset): add commands + streaming encoding default false + print note if false + queue default is now 30

* chore(docs): add verification note advice

* chore(dataset): adjusting defaults & docs for streaming encoding

* docs(scripts): improve docstrings

* test(dataset): polish streaming encoding tests

* chore(dataset): move FYI log related to streaming

* chore(dataset): add arg vcodec to suggestions

* refactor(dataset): better handling for auto and available vcodec

* chore(dataset): change log level

* docs(dataset): add note related to training performance vcodec

* docs(dataset): add more notes to streaming encoding

---------

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Co-authored-by: Pepijn <pepijn@huggingface.co>
2026-02-23 13:57:43 +01:00
107 changed files with 5279 additions and 1199 deletions

View File

@@ -173,6 +173,8 @@ jobs:
shell: bash
working-directory: /lerobot
steps:
- name: Fix ptxas permissions
run: chmod +x /lerobot/.venv/lib/python3.10/site-packages/triton/backends/nvidia/bin/ptxas
- name: Run pytest on GPU
run: pytest tests -vv --maxfail=10
- name: Run end-to-end tests

25
AI_POLICY.md Normal file
View File

@@ -0,0 +1,25 @@
# AI Usage Policy
The LeRobot project welcomes contributions from everyone, and we have a few guidelines regarding AI usage to ensure high code quality, clear communication, and a healthy open-source ecosystem:
- **Please disclose significant AI assistance.** If you used AI tools (e.g., Copilot, Claude, Cursor, ChatGPT) to generate a substantial portion of your code or text, let us know in your PR description. Transparency helps us review your changes more effectively.
- **Own your code (The Human-in-the-Loop).** You must fully understand all the changes you are proposing. If you cannot explain what your AI-assisted code does or how it interacts with LeRobot's broader architecture, please take the time to learn and test it before submitting.
- **Keep issues and discussions focused.** You are welcome to use AI to help draft issues or PR descriptions, but please review and edit them carefully before posting. AI can often be overly verbose; trimming the noise and getting straight to the point helps our maintainers address your needs faster.
Our core maintainers also use AI tools to aid their workflows, but they do so while bringing deep contextual knowledge of the LeRobot codebase to validate the output. We ask all contributors to apply that same level of rigor.
## Remember the Human Maintainers
Please remember that LeRobot is maintained by a dedicated team of humans.
Every discussion, issue, and pull request is read and reviewed by real people. While AI tools can generate thousands of lines of code in seconds, reviewing that code still takes human time and energy. Submitting unverified or low-effort AI output puts an unfair burden on our maintainers.
Today, the quality of the AI output still heavily depends on the developer driving the tool. We ask that you respect our maintainers' time by thoroughly vetting, testing, and refining your submissions.
## AI is Welcome Here
LeRobot operates at the cutting edge of AI and robotics, and many of our maintainers actively embrace AI coding assistants as valuable productivity tools. We are a pro-AI project!
Our reason for having an AI policy is not an anti-AI stance. Rather, it exists to ensure that AI is used to enhance human contributions, not replace them with unverified noise. It's about how the tools are used, not the tools themselves.
We value the unique human insight you bring to the LeRobot community. Let AI empower your workflow, but always let your own judgment take the wheel.

View File

@@ -2,7 +2,7 @@
Everyone is welcome to contribute, and we value everybody's contribution. Code is not the only way to help the community. Answering questions, helping others, reaching out, and improving the documentation are immensely valuable.
Whichever way you choose to contribute, please be mindful to respect our [code of conduct](./CODE_OF_CONDUCT.md).
Whichever way you choose to contribute, please be mindful to respect our [code of conduct](./CODE_OF_CONDUCT.md) and our [AI policy](./AI_POLICY.md).
## Ways to Contribute

View File

@@ -1,2 +1,3 @@
include src/lerobot/templates/lerobot_modelcard_template.md
include src/lerobot/datasets/card_template.md
include src/lerobot/envs/metaworld_config.json

View File

@@ -85,6 +85,8 @@ RUN if [ "$UNBOUND_DEPS" = "true" ]; then \
RUN uv pip install --no-cache ".[all]"
RUN chmod +x /lerobot/.venv/lib/python${PYTHON_VERSION}/site-packages/triton/backends/nvidia/bin/ptxas
# Copy the rest of the application source code
# Make sure to have the git-LFS files for testing
COPY --chown=user_lerobot:user_lerobot . .

View File

@@ -29,6 +29,8 @@
title: Using the Dataset Tools
- local: dataset_subtask
title: Using Subtasks in the Dataset
- local: streaming_video_encoding
title: Streaming Video Encoding
title: "Datasets"
- sections:
- local: act

View File

@@ -88,5 +88,8 @@ lerobot-record \
--dataset.repo_id=${HF_USER}/eval_act_your_dataset \
--dataset.num_episodes=10 \
--dataset.single_task="Your task description" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=${HF_USER}/act_policy
```

View File

@@ -192,6 +192,9 @@ lerobot-record \
--dataset.num_episodes=2 \
--dataset.fps=10 \
--dataset.single_task="Navigate around obstacles" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```

View File

@@ -120,9 +120,12 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=<user>/eval_groot-bimanual \
--dataset.num_episodes=10 \
--dataset.single_task="Grab and handover the red cube to the other arm"
--policy.path=<user>/groot-bimanual # your trained model
--dataset.episode_time_s=30
--dataset.single_task="Grab and handover the red cube to the other arm" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=<user>/groot-bimanual \ # your trained model
--dataset.episode_time_s=30 \
--dataset.reset_time_s=10
```

View File

@@ -230,6 +230,9 @@ lerobot-record \
--dataset.episode_time_s=5 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
@@ -273,5 +276,8 @@ lerobot-record \
--dataset.repo_id=<USER>/eval_hopejr \
--dataset.single_task="Evaluate hopejr hand policy" \
--dataset.num_episodes=10 \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--policy.path=outputs/train/hopejr_hand/checkpoints/last/pretrained_model
```

View File

@@ -165,7 +165,7 @@ huggingface-cli login --token ${HUGGINGFACE_TOKEN} --add-to-git-credential
Then store your Hugging Face repository name in a variable:
```bash
HF_USER=$(hf auth whoami | head -n 1)
HF_USER=$(hf auth whoami | awk -F': *' 'NR==1 {print $2}')
echo $HF_USER
```
@@ -185,7 +185,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/record-test \
--dataset.num_episodes=5 \
--dataset.single_task="Grab the black cube"
--dataset.single_task="Grab the black cube" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
</hfoption>
<hfoption id="API example">
@@ -515,6 +518,9 @@ lerobot-record \
--display_data=false \
--dataset.repo_id=${HF_USER}/eval_so100 \
--dataset.single_task="Put lego brick into the transparent box" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
# <- Teleop optional if you want to teleoperate in between episodes \
# --teleop.type=so100_leader \
# --teleop.port=/dev/ttyACM0 \

View File

@@ -40,6 +40,13 @@ conda install ffmpeg -c conda-forge
>
> - _[On Linux only]_ If you want to bring your own ffmpeg: Install [ffmpeg build dependencies](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#GettheDependencies) and [compile ffmpeg from source with libsvtav1](https://trac.ffmpeg.org/wiki/CompilationGuide/Ubuntu#libsvtav1), and make sure you use the corresponding ffmpeg binary to your install with `which ffmpeg`.
> [!NOTE]
> When installing LeRobot inside WSL (Windows Subsystem for Linux), make sure to install `evdev` with the following command:
>
> ```bash
> conda install evdev -c conda-forge
> ```
## Step 3: Install LeRobot 🤗
### From Source

View File

@@ -41,7 +41,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/record-test \
--dataset.num_episodes=5 \
--dataset.single_task="Grab the black cube"
--dataset.single_task="Grab the black cube" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
See the [recording guide](./il_robots#record-a-dataset) for more details.

View File

@@ -66,12 +66,13 @@ Run on of the examples scripts to teleoperate, record a dataset, replay a datase
All scripts assume you configured your robot (e.g., SO-100 follower) and set the correct serial port.
Additionally you need to **copy the urdf of the robot to the examples folder**. For the examples in this tutorial (Using SO100/SO101) it is highly recommended to use the urdf in the [SO-ARM100 repo](https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf)
Additionally you need to **copy the URDF of the robot into the examples folder**. For the examples in this tutorial (using SO100/SO101), copy the `SO101` folder from the [SO-ARM100 repo](https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101) into the `examples/phone_to_so100/` directory, so that the URDF file path becomes `examples/phone_to_so100/SO101/so101_new_calib.urdf`.
- Run this example to teleoperate:
```bash
python examples/phone_to_so100/teleoperate.py
cd examples/phone_to_so100
python teleoperate.py
```
After running the example:
@@ -84,19 +85,22 @@ Additionally you can customize mapping or safety limits by editing the processor
- Run this example to record a dataset, which saves absolute end effector observations and actions:
```bash
python examples/phone_to_so100/record.py
cd examples/phone_to_so100
python record.py
```
- Run this example to replay recorded episodes:
```bash
python examples/phone_to_so100/replay.py
cd examples/phone_to_so100
python replay.py
```
- Run this example to evaluate a pretrained policy:
```bash
python examples/phone_to_so100/evaluate.py
cd examples/phone_to_so100
python evaluate.py
```
### Important pipeline steps and options

View File

@@ -159,6 +159,9 @@ lerobot-record \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```
@@ -198,6 +201,9 @@ lerobot-record \
--dataset.fps=15 \
--dataset.push_to_hub=true \
--dataset.private=true \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
--display_data=true
```

View File

@@ -106,6 +106,9 @@ lerobot-record \
--dataset.repo_id=${HF_USER}/eval_DATASET_NAME_test \ # <- This will be the dataset name on HF Hub
--dataset.episode_time_s=50 \
--dataset.num_episodes=10 \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
# --dataset.vcodec=auto \
# <- Teleop optional if you want to teleoperate in between episodes \
# --teleop.type=so100_leader \
# --teleop.port=/dev/ttyACM0 \

View File

@@ -0,0 +1,155 @@
# Streaming Video Encoding Guide
## 1. Overview
Streaming video encoding eliminates the traditional PNG round-trip during video dataset recording. Instead of:
1. Capture frame -> write PNG to disk -> (at episode end) read PNG's -> encode to MP4 -> delete PNG's
Frames can be encoded in real-time during capture:
1. Capture frame -> queue to encoder thread -> encode to MP4 directly
This makes `save_episode()` near-instant (the video is already encoded by the time the episode ends) and removes the blocking wait that previously occurred between episodes, especially with multiple cameras in long episodes.
## 2. Tuning Parameters
| Parameter | CLI Flag | Type | Default | Description |
| ----------------------- | --------------------------------- | ------------- | ------------- | ----------------------------------------------------------------- |
| `streaming_encoding` | `--dataset.streaming_encoding` | `bool` | `True` | Enable real-time encoding during capture |
| `vcodec` | `--dataset.vcodec` | `str` | `"libsvtav1"` | Video codec. `"auto"` detects best HW encoder |
| `encoder_threads` | `--dataset.encoder_threads` | `int \| None` | `None` (auto) | Threads per encoder instance. `None` will leave the vcoded decide |
| `encoder_queue_maxsize` | `--dataset.encoder_queue_maxsize` | `int` | `60` | Max buffered frames per camera (~2s at 30fps). Consumes RAM |
## 3. Performance Considerations
Streaming encoding means the CPU is encoding video **during** the capture loop, not after. This creates a CPU budget that must be shared between:
- **Control loop** (reading cameras, control the robot, writing non-video data)
- **Encoder threads** (one pool per camera)
- **Rerun visualization** (if enabled)
- **OS and other processes**
### Resolution & Number of Cameras Impact
| Setup | Throughput (px/sec) | CPU Encoding Load | Notes |
| ------------------------- | ------------------- | ----------------- | ------------------------------ |
| 2camsx 640x480x3 @30fps | 55M | Low | Works on most systems |
| 2camsx 1280x720x3 @30fps | 165M | Moderate | Comfortable on modern systems |
| 2camsx 1920x1080x3 @30fps | 373M | High | Requires powerful high-end CPU |
### `encoder_threads` Tuning
This parameter controls how many threads each encoder instance uses internally:
- **Higher values** (e.g., 4-5): Faster encoding, but uses more CPU cores per camera. Good for high-end systems with many cores.
- **Lower values** (e.g., 1-2): Less CPU per camera, freeing cores for capture and visualization. Good for low-res images and capable CPUs.
- **`None` (default)**: Lets the codec decide. Information available in the codec logs.
### Backpressure and Frame Dropping
Each camera has a bounded queue (`encoder_queue_maxsize`, default 60 frames). When the encoder can't keep up:
1. The queue fills up (consuming RAM)
2. New frames are **dropped** (not blocked) — the capture loop continues uninterrupted
3. A warning is logged: `"Encoder queue full for {camera}, dropped N frame(s)"`
4. At episode end, total dropped frames per camera are reported
### Symptoms of Encoder Falling Behind
- **System feels laggy and freezes**: all CPUs are at 100%
- **Dropped frame warnings** in the log or lower frames/FPS than expected in the recorded dataset
- **Choppy robot movement**: If CPU is severely overloaded, even the capture loop may be affected
- **Accumulated rerun lag**: Visualization falls behind real-time
## 4. Hardware-Accelerated Encoding
### When to Use
Use HW encoding when:
- CPU is the bottleneck (dropped frames, choppy robot, rerun lag)
- You have compatible hardware (GPU or dedicated encoder)
- You're recording at high throughput (high resolution or with many cameras)
### Choosing a Codec
| Codec | CPU Usage | File Size | Quality | Notes |
| --------------------- | --------- | -------------- | ------- | ---------------------------------------------------------------- |
| `libsvtav1` (default) | High | Smallest | Best | Default. Best compression but most CPU-intensive |
| `h264` | Medium | ~30-50% larger | Good | Software H.264. Lower CPU |
| HW encoders | Very Low | Largest | Good | Offloads to dedicated hardware. Best for CPU-constrained systems |
### Available HW Encoders
| Encoder | Platform | Hardware | CLI Value |
| ------------------- | ------------- | ------------------------------------------------------------------------------------------------ | ------------------------------------ |
| `h264_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=h264_videotoolbox` |
| `hevc_videotoolbox` | macOS | Apple Silicon / Intel | `--dataset.vcodec=hevc_videotoolbox` |
| `h264_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=h264_nvenc` |
| `hevc_nvenc` | Linux/Windows | NVIDIA GPU | `--dataset.vcodec=hevc_nvenc` |
| `h264_vaapi` | Linux | Intel/AMD GPU | `--dataset.vcodec=h264_vaapi` |
| `h264_qsv` | Linux/Windows | Intel Quick Sync | `--dataset.vcodec=h264_qsv` |
| `auto` | Any | Probes the system for available HW encoders. Falls back to `libsvtav1` if no HW encoder is found | `--dataset.vcodec=auto` |
> [!NOTE]
> In order to use the HW accelerated encoders you might need to upgrade your GPU drivers.
> [!NOTE]
> `libsvtav1` is the default because it provides the best training performance; other vcodecs can reduce CPU usage and be faster, but they typically produce larger files and may affect training time.
## 5. Troubleshooting
| Symptom | Likely Cause | Fix |
| ------------------------------------------------------------------ | -------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| System freezes or choppy robot movement or Rerun visualization lag | CPU starved (100% load usage) | Close other apps, reduce encoding throughput, lower `encoder_threads`, use `h264`, use `display_data=False`. If the CPU continues to be at 100% then it might be insufficient for your setup, consider `--dataset.streaming_encoding=false` or HW encoding (`--dataset.vcodec=auto`) |
| "Encoder queue full" warnings or dropped frames in dataset | Encoder can't keep up (Queue overflow) | If CPU is not at 100%: Increase `encoder_threads`, increase `encoder_queue_maxsize` or use HW encoding (`--dataset.vcodec=auto`). |
| High RAM usage | Queue filling faster than encoding | `encoder_threads` too low or CPU insufficient. Reduce `encoder_queue_maxsize` or use HW encoding |
| Large video files | Using HW encoder or H.264 | Expected trade-off. Switch to `libsvtav1` if CPU allows |
| `save_episode()` still slow | `streaming_encoding` is `False` | Set `--dataset.streaming_encoding=true` |
| Encoder thread crash | Codec not available or invalid settings | Check `vcodec` is installed, try `--dataset.vcodec=auto` |
| Recorded dataset is missing frames | CPU/GPU starvation or occasional load spikes | If ~5% of frames are missing, your system is likely overloaded — follow the recommendations above. If fewer frames are missing (~2%), they are probably due to occasional transient load spikes (often at startup) and can be considered expected. |
## 6. Recommended Configurations
These estimates are conservative; we recommend testing them on your setup—start with a low load and increase it gradually.
### High-End Systems: modern 12+ cores (24+ threads)
A throughput between ~250-500M px/sec should be comfortable in CPU. For even better results try HW encoding if available.
```bash
# 3camsx 1280x720x3 @30fps: Defaults work well. Optionally increase encoder parallelism.
# 2camsx 1920x1080x3 @30fps: Defaults work well. Optionally increase encoder parallelism.
lerobot-record --dataset.encoder_threads=5 ...
# 3camsx 1920x1080x3 @30fps: Might require some tuning.
```
### Mid-Range Systems: modern 8+ cores (16+ threads) or Apple Silicon
A throughput between ~80-300M px/sec should be possible in CPU.
```bash
# 3camsx 640x480x3 @30fps: Defaults work well. Optionally decrease encoder parallelism.
# 2camsx 1280x720x3 @30fps: Defaults work well. Optionally decrease encoder parallelism.
lerobot-record --dataset.encoder_threads=2 ...
# 2camsx 1920x1080x3 @30fps: Might require some tuning.
```
### Low-Resource Systems: modern 4+ cores (8+ threads) or Raspberry Pi 5
On very constrained systems, streaming encoding may compete too heavily with the capture loop. Disabling it falls back to the PNG-based approach where encoding happens between episodes (blocking, but doesn't interfere with capture). Alternatively, record at a lower throughput to reduce both capture and encoding load. Consider also changing codec to `h264` and using batch encoding.
```bash
# 2camsx 640x480x3 @30fps: Requires some tuning.
# Use H.264, disable streaming, consider batching encoding
lerobot-record --dataset.vcodec=h264 --dataset.streaming_encoding=false ...
```
## 7. Closing note
Performance ultimately depends on your exact setup — frames-per-second, resolution, CPU cores and load, available memory, episode length, and the encoder you choose. Always test with your target workload, be mindful about your CPU & system capabilities and tune `encoder_threads`, `encoder_queue_maxsize`, and
`vcodec` reasonably. That said, a common practical configuration (for many applications) is three cameras at 640×480x3 @30fps; this usually runs fine with the default streaming video encoding settings in modern systems. Always verify your recorded dataset is healthy by comparing the video duration to the CLI episode duration and confirming the row count equals FPS × CLI duration.

View File

@@ -229,7 +229,10 @@ lerobot-record \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
--dataset.push_to_hub=true \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
Example simulation dataset: [nepyope/teleop_test_sim](https://huggingface.co/datasets/nepyope/teleop_test_sim)
@@ -279,7 +282,10 @@ lerobot-record \
--dataset.num_episodes=2 \
--dataset.episode_time_s=5 \
--dataset.reset_time_s=5 \
--dataset.push_to_hub=true
--dataset.push_to_hub=true \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
**Note**: Update `server_address` to match your robot's camera server IP.

View File

@@ -57,7 +57,7 @@ class DatasetReplayConfig:
repo_id: str
# Episode to replay.
episode: int
# Root directory where the dataset will be stored (e.g. 'dataset/path').
# Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id.
root: str | Path | None = None
# Limit the frames per second. By default, uses the policy fps.
fps: int = 30

View File

@@ -18,7 +18,6 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi import LeKiwiClient, LeKiwiClientConfig
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.constants import ACTION, OBS_STR
@@ -71,9 +70,6 @@ def main():
# To connect you already should have this script running on LeKiwi: `python -m lerobot.robots.lekiwi.lekiwi_host --robot.id=my_awesome_kiwi`
robot.connect()
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Initialize the keyboard listener and rerun visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="lekiwi_evaluate")
@@ -99,9 +95,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -116,9 +109,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:

View File

@@ -16,7 +16,6 @@
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.processor import make_default_processors
from lerobot.robots.lekiwi.config_lekiwi import LeKiwiClientConfig
from lerobot.robots.lekiwi.lekiwi_client import LeKiwiClient
from lerobot.scripts.lerobot_record import record_loop
@@ -46,9 +45,6 @@ def main():
leader_arm = SO100Leader(leader_arm_config)
keyboard = KeyboardTeleop(keyboard_config)
# TODO(Steven): Update this example to use pipelines
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
# Configure the dataset features
action_features = hw_to_dataset_features(robot.action_features, ACTION)
obs_features = hw_to_dataset_features(robot.observation_features, OBS_STR)
@@ -93,9 +89,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
# Reset the environment if not stopping or re-recording
@@ -111,9 +104,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
)
if events["rerecord_episode"]:

View File

@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:

View File

@@ -16,21 +16,17 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.pipelines import make_so10x_fk_observation_pipeline
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
EEReferenceAndDelta,
ForwardKinematicsJointsToEE,
GripperVelocityToJoint,
InverseKinematicsEEToJoints,
)
@@ -39,6 +35,7 @@ from lerobot.teleoperators.phone.config_phone import PhoneConfig, PhoneOS
from lerobot.teleoperators.phone.phone_processor import MapPhoneActionToRobotAction
from lerobot.teleoperators.phone.teleop_phone import Phone
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -49,6 +46,10 @@ RESET_TIME_SEC = 30
TASK_DESCRIPTION = "My task description"
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot and teleoperator configurations
@@ -65,77 +66,59 @@ def main():
robot = SO100Follower(robot_config)
phone = Phone(teleop_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
motor_names = list(robot.bus.motors.keys())
from lerobot.model.kinematics import RobotKinematics
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
urdf_path=URDF_PATH,
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
joint_names=motor_names,
)
# Build pipeline to convert phone action to EE action
phone_to_robot_ee_pose_processor = RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
](
steps=[
MapPhoneActionToRobotAction(platform=teleop_config.phone_os),
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=list(robot.bus.motors.keys()),
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Phone output pipeline: map raw phone gesture to EE delta (no robot obs needed)
phone.set_output_pipeline(
RobotProcessorPipeline[RobotAction, RobotAction](
steps=[MapPhoneActionToRobotAction(platform=teleop_config.phone_os)],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
# Robot FK observation pipeline: joints → EE pose
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
# Robot input pipeline: EE delta + current robot obs → joint commands
robot.set_input_pipeline(
RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
EEReferenceAndDelta(
kinematics=kinematics_solver,
end_effector_step_sizes={"x": 0.5, "y": 0.5, "z": 0.5},
motor_names=motor_names,
use_latched_reference=True,
),
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.20,
),
GripperVelocityToJoint(speed_factor=20.0),
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=motor_names,
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
)
# Build pipeline to convert joint observation to EE observation
robot_joints_to_ee_pose = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Dataset features auto-derived from robot's FK obs pipeline and phone's mapped action pipeline
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=phone_to_robot_ee_pose_processor,
initial_features=create_initial_features(action=phone.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
),
features=build_dataset_features(robot, phone, use_videos=True),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
@@ -158,7 +141,7 @@ def main():
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot and phone
record_loop(
robot=robot,
events=events,
@@ -168,9 +151,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
# Reset the environment if not stopping or re-recording
@@ -186,9 +166,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=phone_to_robot_ee_pose_processor,
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose,
)
if events["rerecord_episode"]:

View File

@@ -87,8 +87,8 @@ from lerobot.policies.rtc.action_queue import ActionQueue
from lerobot.policies.rtc.configuration_rtc import RTCConfig
from lerobot.policies.rtc.latency_tracker import LatencyTracker
from lerobot.processor.factory import (
make_default_robot_action_processor,
make_default_robot_observation_processor,
_make_identity_observation_pipeline as make_default_robot_observation_processor,
_make_identity_robot_action_pipeline as make_default_robot_action_processor,
)
from lerobot.rl.process import ProcessSignalHandler
from lerobot.robots import ( # noqa: F401

View File

@@ -17,30 +17,16 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.configs.types import FeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.policies.factory import make_pre_post_processors
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_teleop_action_processor,
)
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import build_dataset_features
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -51,6 +37,10 @@ TASK_DESCRIPTION = "My task description"
HF_MODEL_ID = "<hf_username>/<model_repo_id>"
HF_DATASET_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot configuration & robot
@@ -64,68 +54,31 @@ def main():
robot = SO100Follower(robot_config)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Attach FK/IK pipelines so the robot works in EE space
motor_names = list(robot.bus.motors.keys())
robot.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
robot.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(robot.bus.motors.keys()),
)
# Build pipeline to convert EE action to joints action
robot_ee_to_joints_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
InverseKinematicsEEToJoints(
kinematics=kinematics_solver,
motor_names=list(robot.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert joints observation to EE observation
robot_joints_to_ee_pose_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())
)
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Create the dataset
# Create the dataset — obs auto-derived from FK pipeline, EE action spec explicit
dataset = LeRobotDataset.create(
repo_id=HF_DATASET_ID,
fps=FPS,
features=combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=robot_joints_to_ee_pose_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=True,
),
# User for now should be explicit on the feature keys that were used for record
# Alternatively, the user can pass the processor step that has the right features
aggregate_pipeline_dataset_features(
pipeline=make_default_teleop_action_processor(),
initial_features=create_initial_features(
action={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
}
),
use_videos=True,
),
features=build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
),
robot_type=robot.name,
use_videos=True,
image_writer_threads=4,
)
# Create policy
policy = ACTPolicy.from_pretrained(HF_MODEL_ID)
# Build Policy Processors
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=policy,
@@ -135,7 +88,7 @@ def main():
preprocessor_overrides={"device_processor": {"device": str(policy.config.device)}},
)
# Connect the robot and teleoperator
# Connect the robot
robot.connect()
# Initialize the keyboard listener and rerun visualization
@@ -151,21 +104,18 @@ def main():
for episode_idx in range(NUM_EPISODES):
log_say(f"Running inference, recording eval episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Main record loop — pipelines applied internally by robot
record_loop(
robot=robot,
events=events,
fps=FPS,
policy=policy,
preprocessor=preprocessor, # Pass the pre and post policy processors
preprocessor=preprocessor,
postprocessor=postprocessor,
dataset=dataset,
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
# Reset the environment if not stopping or re-recording
@@ -180,9 +130,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=make_default_teleop_action_processor(),
robot_action_processor=robot_ee_to_joints_processor,
robot_observation_processor=robot_joints_to_ee_pose_processor,
)
if events["rerecord_episode"]:

View File

@@ -17,25 +17,20 @@
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import combine_feature_dicts
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_record import record_loop
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
from lerobot.utils.control_utils import init_keyboard_listener
from lerobot.utils.pipeline_utils import (
build_dataset_features,
check_action_space_compatibility,
check_observation_space_compatibility,
)
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import init_rerun
@@ -46,6 +41,10 @@ RESET_TIME_SEC = 30
TASK_DESCRIPTION = "My task description"
HF_REPO_ID = "<hf_username>/<dataset_repo_id>"
# NOTE: Use the URDF from the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Create the robot and teleoperator configurations
@@ -62,77 +61,17 @@ def main():
follower = SO100Follower(follower_config)
leader = SO100Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# Attach EE-space pipelines to the objects
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
# Build pipeline to convert follower joints to EE observation
follower_joints_to_ee = RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[
ForwardKinematicsJointsToEE(
kinematics=follower_kinematics_solver, motor_names=list(follower.bus.motors.keys())
),
],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
# Build pipeline to convert leader joints to EE action
leader_joints_to_ee = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
ForwardKinematicsJointsToEE(
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Build pipeline to convert EE action to follower joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
InverseKinematicsEEToJoints(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Create the dataset
# Dataset features are derived automatically from robot/teleop pipelines
dataset = LeRobotDataset.create(
repo_id=HF_REPO_ID,
fps=FPS,
features=combine_feature_dicts(
# Run the feature contract of the pipelines
# This tells you how the features would look like after the pipeline steps
aggregate_pipeline_dataset_features(
pipeline=leader_joints_to_ee,
initial_features=create_initial_features(action=leader.action_features),
use_videos=True,
),
aggregate_pipeline_dataset_features(
pipeline=follower_joints_to_ee,
initial_features=create_initial_features(observation=follower.observation_features),
use_videos=True,
),
),
features=build_dataset_features(follower, leader, use_videos=True),
robot_type=follower.name,
use_videos=True,
image_writer_threads=4,
@@ -142,9 +81,13 @@ def main():
leader.connect()
follower.connect()
# Verify action/observation space alignment (warns on mismatch)
check_action_space_compatibility(leader, follower)
check_observation_space_compatibility(follower, leader)
# Initialize the keyboard listener and rerun visualization
listener, events = init_keyboard_listener()
init_rerun(session_name="recording_phone")
init_rerun(session_name="recording_ee")
try:
if not leader.is_connected or not follower.is_connected:
@@ -155,7 +98,8 @@ def main():
while episode_idx < NUM_EPISODES and not events["stop_recording"]:
log_say(f"Recording episode {episode_idx + 1} of {NUM_EPISODES}")
# Main record loop
# Pipelines applied automatically inside robot.get_observation(),
# teleop.get_action(), and robot.send_action()
record_loop(
robot=follower,
events=events,
@@ -165,9 +109,6 @@ def main():
control_time_s=EPISODE_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=leader_joints_to_ee,
robot_action_processor=ee_to_follower_joints,
robot_observation_processor=follower_joints_to_ee,
)
# Reset the environment if not stopping or re-recording
@@ -183,9 +124,6 @@ def main():
control_time_s=RESET_TIME_SEC,
single_task=TASK_DESCRIPTION,
display_data=True,
teleop_action_processor=leader_joints_to_ee,
robot_action_processor=ee_to_follower_joints,
robot_observation_processor=follower_joints_to_ee,
)
if events["rerecord_episode"]:

View File

@@ -14,27 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import time
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
robot_action_observation_to_transition,
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower import SO100Follower, SO100FollowerConfig
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
from lerobot.scripts.lerobot_teleoperate import teleop_loop
from lerobot.teleoperators.so_leader import SO100Leader, SO100LeaderConfig
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
from lerobot.utils.pipeline_utils import check_action_space_compatibility
from lerobot.utils.visualization_utils import init_rerun
FPS = 30
# NOTE: Use the URDF from the SO-ARM100 repo:
# https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
URDF_PATH = "./SO101/so101_new_calib.urdf"
def main():
# Initialize the robot and teleoperator config
@@ -47,47 +43,14 @@ def main():
follower = SO100Follower(follower_config)
leader = SO100Leader(leader_config)
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
follower_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(follower.bus.motors.keys()),
)
# Attach EE-space pipelines to the objects
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, list(leader.bus.motors.keys())))
# NOTE: It is highly recommended to use the urdf in the SO-ARM100 repo: https://github.com/TheRobotStudio/SO-ARM100/blob/main/Simulation/SO101/so101_new_calib.urdf
leader_kinematics_solver = RobotKinematics(
urdf_path="./SO101/so101_new_calib.urdf",
target_frame_name="gripper_frame_link",
joint_names=list(leader.bus.motors.keys()),
)
# Build pipeline to convert teleop joints to EE action
leader_to_ee = RobotProcessorPipeline[RobotAction, RobotAction](
steps=[
ForwardKinematicsJointsToEE(
kinematics=leader_kinematics_solver, motor_names=list(leader.bus.motors.keys())
),
],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
# build pipeline to convert EE action to robot joints
ee_to_follower_joints = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
[
EEBoundsAndSafety(
end_effector_bounds={"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]},
max_ee_step_m=0.10,
),
InverseKinematicsEEToJoints(
kinematics=follower_kinematics_solver,
motor_names=list(follower.bus.motors.keys()),
initial_guess_current_joints=False,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
# Verify action space alignment (warns if leader EE ≠ follower action_features)
check_action_space_compatibility(leader, follower)
# Connect to the robot and teleoperator
follower.connect()
@@ -97,28 +60,12 @@ def main():
init_rerun(session_name="so100_so100_EE_teleop")
print("Starting teleop loop...")
while True:
t0 = time.perf_counter()
# Get robot observation
robot_obs = follower.get_observation()
# Get teleop observation
leader_joints_obs = leader.get_action()
# teleop joints -> teleop EE action
leader_ee_act = leader_to_ee(leader_joints_obs)
# teleop EE -> robot joints
follower_joints_act = ee_to_follower_joints((leader_ee_act, robot_obs))
# Send action to robot
_ = follower.send_action(follower_joints_act)
# Visualize
log_rerun_data(observation=leader_ee_act, action=follower_joints_act)
precise_sleep(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
try:
# Pipelines applied automatically inside teleop.get_action() and robot.send_action()
teleop_loop(teleop=leader, robot=follower, fps=FPS, display_data=True)
finally:
follower.disconnect()
leader.disconnect()
if __name__ == "__main__":

View File

@@ -25,7 +25,7 @@ discord = "https://discord.gg/s3KuuzsPFb"
[project]
name = "lerobot"
version = "0.4.4"
version = "0.4.5"
description = "🤗 LeRobot: State-of-the-art Machine Learning for Real-World Robotics in Pytorch"
dynamic = ["readme"]
license = { text = "Apache-2.0" }
@@ -98,11 +98,13 @@ pygame-dep = ["pygame>=2.5.1,<2.7.0"]
placo-dep = ["placo>=0.9.6,<0.10.0"]
transformers-dep = ["transformers>=4.57.1,<5.0.0"]
grpcio-dep = ["grpcio==1.73.1", "protobuf>=6.31.1,<6.32.0"]
can-dep = ["python-can>=4.2.0,<5.0.0"]
# Motors
feetech = ["feetech-servo-sdk>=1.0.0,<2.0.0"]
dynamixel = ["dynamixel-sdk>=3.7.31,<3.9.0"]
damiao = ["python-can>=4.2.0,<5.0.0"]
damiao = ["lerobot[can-dep]"]
robstride = ["lerobot[can-dep]"]
# Robots
openarms = ["lerobot[damiao]"]
@@ -212,6 +214,9 @@ lerobot-edit-dataset="lerobot.scripts.lerobot_edit_dataset:main"
lerobot-setup-can="lerobot.scripts.lerobot_setup_can:main"
# ---------------- Tool Configurations ----------------
[tool.setuptools.package-data]
lerobot = ["envs/*.json"]
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -49,23 +49,18 @@ import torch
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
bi_so_follower,
koch_follower,
from lerobot.robots import (
RobotConfig, # noqa: F401
make_robot_from_config,
omx_follower,
so_follower,
)
from lerobot.transport import (
services_pb2, # type: ignore
services_pb2_grpc, # type: ignore
)
from lerobot.transport.utils import grpc_channel_options, send_bytes_in_chunks
from lerobot.utils.import_utils import register_third_party_plugins
from .configs import RobotClientConfig
from .constants import SUPPORTED_ROBOTS
from .helpers import (
Action,
FPSTracker,
@@ -485,8 +480,9 @@ class RobotClient:
def async_client(cfg: RobotClientConfig):
logging.info(pformat(asdict(cfg)))
if cfg.robot.type not in SUPPORTED_ROBOTS:
raise ValueError(f"Robot {cfg.robot.type} not yet supported!")
# TODO: Assert if checking robot support is still needed with the plugin system
# if cfg.robot.type not in SUPPORTED_ROBOTS:
# raise ValueError(f"Robot {cfg.robot.type} not yet supported!")
client = RobotClient(cfg)
@@ -512,4 +508,5 @@ def async_client(cfg: RobotClientConfig):
if __name__ == "__main__":
register_third_party_plugins()
async_client() # run the client

View File

@@ -27,7 +27,7 @@ class DatasetConfig:
# "dataset_index" into the returned item. The index mapping is made according to the order in which the
# datasets are provided.
repo_id: str
# Root directory where the dataset will be stored (e.g. 'dataset/path').
# Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id.
root: str | None = None
episodes: list[int] | None = None
image_transforms: ImageTransformsConfig = field(default_factory=ImageTransformsConfig)

View File

@@ -7,6 +7,13 @@
This dataset was created using [LeRobot](https://github.com/huggingface/lerobot).
{% if repo_id is defined and repo_id %}
<a class="flex" href="https://huggingface.co/spaces/lerobot/visualize_dataset?path={{ repo_id }}">
<img class="block dark:hidden" src="https://huggingface.co/datasets/huggingface/badges/resolve/main/visualize-this-dataset-xl.svg"/>
<img class="hidden dark:block" src="https://huggingface.co/datasets/huggingface/badges/resolve/main/visualize-this-dataset-xl-dark.svg"/>
</a>
{% endif %}
## Dataset Description
{{ dataset_description | default("", true) }}

View File

@@ -567,20 +567,22 @@ def _copy_and_reindex_data(
def _keep_episodes_from_video_with_av(
input_path: Path,
output_path: Path,
episodes_to_keep: list[tuple[float, float]],
episodes_to_keep: list[tuple[int, int]],
fps: float,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
) -> None:
"""Keep only specified episodes from a video file using PyAV.
This function decodes frames from specified time ranges and re-encodes them with
This function decodes frames from specified frame ranges and re-encodes them with
properly reset timestamps to ensure monotonic progression.
Args:
input_path: Source video file path.
output_path: Destination video file path.
episodes_to_keep: List of (start_time, end_time) tuples for episodes to keep.
episodes_to_keep: List of (start_frame, end_frame) tuples for episodes to keep.
Ranges are half-open intervals: [start_frame, end_frame), where start_frame
is inclusive and end_frame is exclusive.
fps: Frame rate of the video.
vcodec: Video codec to use for encoding.
pix_fmt: Pixel format for output video.
@@ -622,9 +624,10 @@ def _keep_episodes_from_video_with_av(
# Create set of (start, end) ranges for fast lookup.
# Convert to a sorted list for efficient checking.
time_ranges = sorted(episodes_to_keep)
frame_ranges = sorted(episodes_to_keep)
# Track frame index for setting PTS and current range being processed.
src_frame_count = 0
frame_count = 0
range_idx = 0
@@ -634,21 +637,20 @@ def _keep_episodes_from_video_with_av(
if frame is None:
continue
# Get frame timestamp.
frame_time = float(frame.pts * frame.time_base) if frame.pts is not None else 0.0
# Check if frame is in any of our desired time ranges.
# Check if frame is in any of our desired frame ranges.
# Skip ranges that have already passed.
while range_idx < len(time_ranges) and frame_time >= time_ranges[range_idx][1]:
while range_idx < len(frame_ranges) and src_frame_count >= frame_ranges[range_idx][1]:
range_idx += 1
# If we've passed all ranges, stop processing.
if range_idx >= len(time_ranges):
if range_idx >= len(frame_ranges):
break
# Check if frame is in current range.
start_ts, end_ts = time_ranges[range_idx]
if frame_time < start_ts:
start_frame = frame_ranges[range_idx][0]
if src_frame_count < start_frame:
src_frame_count += 1
continue
# Frame is in range - create a new frame with reset timestamps.
@@ -661,6 +663,7 @@ def _keep_episodes_from_video_with_av(
for pkt in v_out.encode(new_frame):
out.mux(pkt)
src_frame_count += 1
frame_count += 1
# Flush encoder.
@@ -749,15 +752,17 @@ def _copy_and_reindex_videos(
f"videos/{video_key}/to_timestamp"
]
else:
# Build list of time ranges to keep, in sorted order.
# Build list of frame ranges to keep, in sorted order.
sorted_keep_episodes = sorted(episodes_in_file, key=lambda x: episode_mapping[x])
episodes_to_keep_ranges: list[tuple[float, float]] = []
episodes_to_keep_ranges: list[tuple[int, int]] = []
for old_idx in sorted_keep_episodes:
src_ep = src_dataset.meta.episodes[old_idx]
from_ts = src_ep[f"videos/{video_key}/from_timestamp"]
to_ts = src_ep[f"videos/{video_key}/to_timestamp"]
episodes_to_keep_ranges.append((from_ts, to_ts))
from_frame = round(src_ep[f"videos/{video_key}/from_timestamp"] * src_dataset.meta.fps)
to_frame = round(src_ep[f"videos/{video_key}/to_timestamp"] * src_dataset.meta.fps)
assert src_ep["length"] == to_frame - from_frame, (
f"Episode length mismatch: {src_ep['length']} vs {to_frame - from_frame}"
)
episodes_to_keep_ranges.append((from_frame, to_frame))
# Use PyAV filters to efficiently re-encode only the desired segments.
assert src_dataset.meta.video_path is not None

View File

@@ -68,6 +68,7 @@ from lerobot.datasets.utils import (
write_tasks,
)
from lerobot.datasets.video_utils import (
StreamingVideoEncoder,
VideoFrame,
concatenate_video_files,
decode_video_frames,
@@ -75,11 +76,11 @@ from lerobot.datasets.video_utils import (
get_safe_default_codec,
get_video_duration_in_s,
get_video_info,
resolve_vcodec,
)
from lerobot.utils.constants import HF_LEROBOT_HOME
CODEBASE_VERSION = "v3.0"
VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1"}
class LeRobotDatasetMetadata:
@@ -545,12 +546,19 @@ class LeRobotDatasetMetadata:
def _encode_video_worker(
video_key: str, episode_index: int, root: Path, fps: int, vcodec: str = "libsvtav1"
video_key: str,
episode_index: int,
root: Path,
fps: int,
vcodec: str = "libsvtav1",
encoder_threads: int | None = None,
) -> Path:
temp_path = Path(tempfile.mkdtemp(dir=root)) / f"{video_key}_{episode_index:03d}.mp4"
fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=episode_index, frame_index=0)
img_dir = (root / fpath).parent
encode_video_frames(img_dir, temp_path, fps, vcodec=vcodec, overwrite=True)
encode_video_frames(
img_dir, temp_path, fps, vcodec=vcodec, overwrite=True, encoder_threads=encoder_threads
)
shutil.rmtree(img_dir)
return temp_path
@@ -570,6 +578,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
vcodec: str = "libsvtav1",
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
encoder_threads: int | None = None,
):
"""
2 modes are available for instantiating this class, depending on 2 different use cases:
@@ -653,11 +664,11 @@ class LeRobotDataset(torch.utils.data.Dataset):
for the README).
Args:
repo_id (str): This is the repo id that will be used to fetch the dataset. Locally, the dataset
will be stored under root/repo_id.
root (Path | None, optional): Local directory to use for downloading/writing files. You can also
set the HF_LEROBOT_HOME environment variable to point to a different location. Defaults to
'~/.cache/huggingface/lerobot'.
repo_id (str): This is the repo id that will be used to fetch the dataset.
root (Path | None, optional): Local directory where the dataset will be downloaded and
stored. If set, all dataset files will be stored directly under this path. If not set, the
dataset files will be stored under $HF_LEROBOT_HOME/repo_id (configurable via the
HF_LEROBOT_HOME environment variable).
episodes (list[int] | None, optional): If specified, this will only load episodes specified by
their episode_index in this list. Defaults to None.
image_transforms (Callable | None, optional): You can pass standard v2 image transforms from
@@ -683,12 +694,17 @@ class LeRobotDataset(torch.utils.data.Dataset):
batch_encoding_size (int, optional): Number of episodes to accumulate before batch encoding videos.
Set to 1 for immediate encoding (default), or higher for batched encoding. Defaults to 1.
vcodec (str, optional): Video codec for encoding videos during recording. Options: 'h264', 'hevc',
'libsvtav1'. Defaults to 'libsvtav1'. Use 'h264' for faster encoding on systems where AV1
encoding is CPU-heavy.
'libsvtav1', 'auto', or hardware-specific codecs like 'h264_videotoolbox', 'h264_nvenc'.
Defaults to 'libsvtav1'. Use 'auto' to auto-detect the best available hardware encoder.
streaming_encoding (bool, optional): If True, encode video frames in real-time during capture
instead of writing PNG images first. This makes save_episode() near-instant. Defaults to False.
encoder_queue_maxsize (int, optional): Maximum number of frames to buffer per camera when using
streaming encoding. Defaults to 30 (~1s at 30fps).
encoder_threads (int | None, optional): Number of threads per encoder instance. None lets the
codec auto-detect (default). Lower values reduce CPU usage per encoder. Maps to 'lp' (via svtav1-params) for
libsvtav1 and 'threads' for h264/hevc.
"""
super().__init__()
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
self.repo_id = repo_id
self.root = Path(root) if root else HF_LEROBOT_HOME / repo_id
self.image_transforms = image_transforms
@@ -700,7 +716,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.delta_indices = None
self.batch_encoding_size = batch_encoding_size
self.episodes_since_last_encoding = 0
self.vcodec = vcodec
self.vcodec = resolve_vcodec(vcodec)
self._encoder_threads = encoder_threads
# Unused attributes
self.image_writer = None
@@ -708,6 +725,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.writer = None
self.latest_episode = None
self._current_file_start_frame = None # Track the starting frame index of the current parquet file
self._streaming_encoder = None
self.root.mkdir(exist_ok=True, parents=True)
@@ -729,7 +747,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Check if cached dataset contains all requested episodes
if not self._check_cached_episodes_sufficient():
raise FileNotFoundError("Cached dataset doesn't contain all requested episodes")
except (AssertionError, FileNotFoundError, NotADirectoryError):
except (FileNotFoundError, NotADirectoryError):
if is_valid_version(self.revision):
self.revision = get_safe_version(self.repo_id, self.revision)
self.download(download_videos)
@@ -749,6 +767,19 @@ class LeRobotDataset(torch.utils.data.Dataset):
check_delta_timestamps(self.delta_timestamps, self.fps, self.tolerance_s)
self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps)
# Initialize streaming encoder for resumed recording
if streaming_encoding and len(self.meta.video_keys) > 0:
self._streaming_encoder = StreamingVideoEncoder(
fps=self.meta.fps,
vcodec=self.vcodec,
pix_fmt="yuv420p",
g=2,
crf=30,
preset=None,
queue_maxsize=encoder_queue_maxsize,
encoder_threads=encoder_threads,
)
def _close_writer(self) -> None:
"""Close and cleanup the parquet writer if it exists."""
writer = getattr(self, "writer", None)
@@ -808,7 +839,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
hub_api.upload_folder(**upload_kwargs)
card = create_lerobot_dataset_card(
tags=tags, dataset_info=self.meta.info, license=license, **card_kwargs
tags=tags, dataset_info=self.meta.info, license=license, repo_id=self.repo_id, **card_kwargs
)
card.push_to_hub(repo_id=self.repo_id, repo_type="dataset", revision=branch)
@@ -1104,6 +1135,8 @@ class LeRobotDataset(torch.utils.data.Dataset):
"""
self._close_writer()
self.meta._close_writer()
if self._streaming_encoder is not None:
self._streaming_encoder.close()
def create_episode_buffer(self, episode_index: int | None = None) -> dict:
current_ep_idx = self.meta.total_episodes if episode_index is None else episode_index
@@ -1158,6 +1191,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.episode_buffer["timestamp"].append(timestamp)
self.episode_buffer["task"].append(frame.pop("task")) # Remove task from frame after processing
# Start streaming encoder on first frame of episode (once, before iterating keys)
if frame_index == 0 and self._streaming_encoder is not None:
self._streaming_encoder.start_episode(
video_keys=list(self.meta.video_keys),
temp_dir=self.root,
)
# Add frame features to episode_buffer
for key in frame:
if key not in self.features:
@@ -1165,7 +1205,10 @@ class LeRobotDataset(torch.utils.data.Dataset):
f"An element of the frame is not in the features. '{key}' not in '{self.features.keys()}'."
)
if self.features[key]["dtype"] in ["image", "video"]:
if self.features[key]["dtype"] == "video" and self._streaming_encoder is not None:
self._streaming_encoder.feed_frame(key, frame[key])
self.episode_buffer[key].append(None) # Placeholder (video keys are skipped in parquet)
elif self.features[key]["dtype"] in ["image", "video"]:
img_path = self._get_image_file_path(
episode_index=self.episode_buffer["episode_index"], image_key=key, frame_index=frame_index
)
@@ -1226,13 +1269,38 @@ class LeRobotDataset(torch.utils.data.Dataset):
# Wait for image writer to end, so that episode stats over images can be computed
self._wait_image_writer()
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
has_video_keys = len(self.meta.video_keys) > 0
use_streaming = self._streaming_encoder is not None and has_video_keys
use_batched_encoding = self.batch_encoding_size > 1
if has_video_keys and not use_batched_encoding:
if use_streaming:
# Compute stats for non-video features only (video stats come from encoder)
non_video_buffer = {
k: v
for k, v in episode_buffer.items()
if self.features.get(k, {}).get("dtype") not in ("video",)
}
non_video_features = {k: v for k, v in self.features.items() if v["dtype"] != "video"}
ep_stats = compute_episode_stats(non_video_buffer, non_video_features)
else:
ep_stats = compute_episode_stats(episode_buffer, self.features)
ep_metadata = self._save_episode_data(episode_buffer)
if use_streaming:
# Finish streaming encoding and collect results
streaming_results = self._streaming_encoder.finish_episode()
for video_key in self.meta.video_keys:
temp_path, video_stats = streaming_results[video_key]
if video_stats is not None:
# Format stats same as compute_episode_stats: normalize to [0,1], reshape to (C,1,1)
ep_stats[video_key] = {
k: v if k == "count" else np.squeeze(v.reshape(1, -1, 1, 1) / 255.0, axis=0)
for k, v in video_stats.items()
}
ep_metadata.update(self._save_episode_video(video_key, episode_index, temp_path=temp_path))
elif has_video_keys and not use_batched_encoding:
num_cameras = len(self.meta.video_keys)
if parallel_encoding and num_cameras > 1:
# TODO(Steven): Ideally we would like to control the number of threads per encoding such that:
@@ -1246,6 +1314,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
self.root,
self.fps,
self.vcodec,
self._encoder_threads,
): video_key
for video_key in self.meta.video_keys
}
@@ -1514,6 +1583,10 @@ class LeRobotDataset(torch.utils.data.Dataset):
return metadata
def clear_episode_buffer(self, delete_images: bool = True) -> None:
# Cancel streaming encoder if active
if self._streaming_encoder is not None:
self._streaming_encoder.cancel_episode()
# Clean up image files for the current episode buffer
if delete_images:
# Wait for the async image writer to finish
@@ -1561,7 +1634,9 @@ class LeRobotDataset(torch.utils.data.Dataset):
Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding,
since video encoding with ffmpeg is already using multithreading.
"""
return _encode_video_worker(video_key, episode_index, self.root, self.fps, self.vcodec)
return _encode_video_worker(
video_key, episode_index, self.root, self.fps, self.vcodec, self._encoder_threads
)
@classmethod
def create(
@@ -1578,10 +1653,13 @@ class LeRobotDataset(torch.utils.data.Dataset):
video_backend: str | None = None,
batch_encoding_size: int = 1,
vcodec: str = "libsvtav1",
metadata_buffer_size: int = 10,
streaming_encoding: bool = False,
encoder_queue_maxsize: int = 30,
encoder_threads: int | None = None,
) -> "LeRobotDataset":
"""Create a LeRobot Dataset from scratch in order to record data."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
vcodec = resolve_vcodec(vcodec)
obj = cls.__new__(cls)
obj.meta = LeRobotDatasetMetadata.create(
repo_id=repo_id,
@@ -1590,6 +1668,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
features=features,
root=root,
use_videos=use_videos,
metadata_buffer_size=metadata_buffer_size,
)
obj.repo_id = obj.meta.repo_id
obj.root = obj.meta.root
@@ -1599,6 +1678,7 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj.batch_encoding_size = batch_encoding_size
obj.episodes_since_last_encoding = 0
obj.vcodec = vcodec
obj._encoder_threads = encoder_threads
if image_writer_processes or image_writer_threads:
obj.start_image_writer(image_writer_processes, image_writer_threads)
@@ -1620,6 +1700,22 @@ class LeRobotDataset(torch.utils.data.Dataset):
obj._lazy_loading = False
obj._recorded_frames = 0
obj._writer_closed_for_reading = False
# Initialize streaming encoder
if streaming_encoding and len(obj.meta.video_keys) > 0:
obj._streaming_encoder = StreamingVideoEncoder(
fps=fps,
vcodec=vcodec,
pix_fmt="yuv420p",
g=2,
crf=30,
preset=None,
queue_maxsize=encoder_queue_maxsize,
encoder_threads=encoder_threads,
)
else:
obj._streaming_encoder = None
return obj
@@ -1675,11 +1771,12 @@ class MultiLeRobotDataset(torch.utils.data.Dataset):
)
for repo_id, ds in zip(self.repo_ids, self._datasets, strict=True):
extra_keys = set(ds.features).difference(intersection_features)
logging.warning(
f"keys {extra_keys} of {repo_id} were disabled as they are not contained in all the "
"other datasets."
)
self.disabled_features.update(extra_keys)
if extra_keys:
logging.warning(
f"keys {extra_keys} of {repo_id} were disabled as they are not contained in all the "
"other datasets."
)
self.disabled_features.update(extra_keys)
self.image_transforms = image_transforms
self.delta_timestamps = delta_timestamps

View File

@@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from collections.abc import Sequence
from typing import Any
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from lerobot.configs.types import PipelineFeatureType
from lerobot.datasets.utils import hw_to_dataset_features
from lerobot.processor import DataProcessorPipeline, RobotAction, RobotObservation
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
if TYPE_CHECKING:
from lerobot.processor import RobotAction, RobotObservation
def create_initial_features(
@@ -41,99 +41,3 @@ def create_initial_features(
if observation:
features[PipelineFeatureType.OBSERVATION] = observation
return features
# Helper to filter state/action keys based on regex patterns.
def should_keep(key: str, patterns: tuple[str]) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def strip_prefix(key: str, prefixes_to_strip: tuple[str]) -> str:
for prefix in prefixes_to_strip:
if key.startswith(prefix):
return key[len(prefix) :]
return key
# Define prefixes to strip from feature keys for clean names.
# Handles both fully qualified (e.g., "action.state") and short (e.g., "state") forms.
PREFIXES_TO_STRIP = tuple(
f"{token}." for const in (ACTION, OBS_STATE, OBS_IMAGES) for token in (const, const.split(".")[-1])
)
def aggregate_pipeline_dataset_features(
pipeline: DataProcessorPipeline,
initial_features: dict[PipelineFeatureType, dict[str, Any]],
*,
use_videos: bool = True,
patterns: Sequence[str] | None = None,
) -> dict[str, dict]:
"""
Aggregates and filters pipeline features to create a dataset-ready features dictionary.
This function transforms initial features using the pipeline, categorizes them as action or observations
(image or state), filters them based on `use_videos` and `patterns`, and finally
formats them for use with a Hugging Face LeRobot Dataset.
Args:
pipeline: The DataProcessorPipeline to apply.
initial_features: A dictionary of raw feature specs for actions and observations.
use_videos: If False, image features are excluded.
patterns: A sequence of regex patterns to filter action and state features.
Image features are not affected by this filter.
Returns:
A dictionary of features formatted for a Hugging Face LeRobot Dataset.
"""
all_features = pipeline.transform_features(initial_features)
# Intermediate storage for categorized and filtered features.
processed_features: dict[str, dict[str, Any]] = {
ACTION: {},
OBS_STR: {},
}
images_token = OBS_IMAGES.split(".")[-1]
# Iterate through all features transformed by the pipeline.
for ptype, feats in all_features.items():
if ptype not in [PipelineFeatureType.ACTION, PipelineFeatureType.OBSERVATION]:
continue
for key, value in feats.items():
# 1. Categorize the feature.
is_action = ptype == PipelineFeatureType.ACTION
# Observations are classified as images if their key matches image-related tokens or if the shape of the feature is 3.
# All other observations are treated as state.
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or (
key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{images_token}.")
or f".{images_token}." in key
)
)
# 2. Apply filtering rules.
if is_image and not use_videos:
continue
if not is_image and not should_keep(key, patterns):
continue
# 3. Add the feature to the appropriate group with a clean name.
name = strip_prefix(key, PREFIXES_TO_STRIP)
if is_action:
processed_features[ACTION][name] = value
else:
processed_features[OBS_STR][name] = value
# Convert the processed features into the final dataset format.
dataset_features = {}
if processed_features[ACTION]:
dataset_features.update(hw_to_dataset_features(processed_features[ACTION], ACTION, use_videos))
if processed_features[OBS_STR]:
dataset_features.update(hw_to_dataset_features(processed_features[OBS_STR], OBS_STR, use_videos))
return dataset_features

View File

@@ -13,25 +13,106 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import glob
import importlib
import logging
import queue
import shutil
import tempfile
import threading
import warnings
from dataclasses import dataclass, field
from fractions import Fraction
from pathlib import Path
from threading import Lock
from typing import Any, ClassVar
import av
import fsspec
import numpy as np
import pyarrow as pa
import torch
import torchvision
from datasets.features.features import register_feature
from PIL import Image
# List of hardware encoders to probe for auto-selection. Availability depends on the platform and FFmpeg build.
# Determines the order of preference for auto-selection when vcodec="auto" is used.
HW_ENCODERS = [
"h264_videotoolbox", # macOS
"hevc_videotoolbox", # macOS
"h264_nvenc", # NVIDIA GPU
"hevc_nvenc", # NVIDIA GPU
"h264_vaapi", # Linux Intel/AMD
"h264_qsv", # Intel Quick Sync
]
VALID_VIDEO_CODECS = {"h264", "hevc", "libsvtav1", "auto"} | set(HW_ENCODERS)
def _get_codec_options(
vcodec: str,
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
) -> dict:
"""Build codec-specific options dict for video encoding."""
options = {}
# GOP size (keyframe interval) - supported by VideoToolbox and software encoders
if g is not None and (vcodec in ("h264_videotoolbox", "hevc_videotoolbox") or vcodec not in HW_ENCODERS):
options["g"] = str(g)
# Quality control (codec-specific parameter names)
if crf is not None:
if vcodec in ("h264", "hevc", "libsvtav1"):
options["crf"] = str(crf)
elif vcodec in ("h264_videotoolbox", "hevc_videotoolbox"):
quality = max(1, min(100, int(100 - crf * 2)))
options["q:v"] = str(quality)
elif vcodec in ("h264_nvenc", "hevc_nvenc"):
options["rc"] = "constqp"
options["qp"] = str(crf)
elif vcodec in ("h264_vaapi",):
options["qp"] = str(crf)
elif vcodec in ("h264_qsv",):
options["global_quality"] = str(crf)
# Preset (only for libsvtav1)
if vcodec == "libsvtav1":
options["preset"] = str(preset) if preset is not None else "12"
return options
def detect_available_hw_encoders() -> list[str]:
"""Probe PyAV/FFmpeg for available hardware video encoders."""
available = []
for codec_name in HW_ENCODERS:
try:
av.codec.Codec(codec_name, "w")
available.append(codec_name)
except Exception: # nosec B110
pass # nosec B110
return available
def resolve_vcodec(vcodec: str) -> str:
"""Validate vcodec and resolve 'auto' to best available HW encoder, fallback to libsvtav1."""
if vcodec not in VALID_VIDEO_CODECS:
raise ValueError(f"Invalid vcodec '{vcodec}'. Must be one of: {sorted(VALID_VIDEO_CODECS)}")
if vcodec != "auto":
logging.info(f"Using video codec: {vcodec}")
return vcodec
available = detect_available_hw_encoders()
for encoder in HW_ENCODERS:
if encoder in available:
logging.info(f"Auto-selected video codec: {encoder}")
return encoder
logging.info("No hardware encoder available, falling back to software encoder 'libsvtav1'")
return "libsvtav1"
def get_safe_default_codec():
if importlib.util.find_spec("torchcodec"):
@@ -146,16 +227,17 @@ def decode_video_frames_torchvision(
min_, argmin_ = dist.min(1)
is_within_tol = min_ < tolerance_s
assert is_within_tol.all(), (
f"One or several query timestamps unexpectedly violate the tolerance ({min_[~is_within_tol]} > {tolerance_s=})."
"It means that the closest frame that can be loaded from the video is too far away in time."
"This might be due to synchronization issues with timestamps during data collection."
"To be safe, we advise to ignore this item during training."
f"\nqueried timestamps: {query_ts}"
f"\nloaded timestamps: {loaded_ts}"
f"\nvideo: {video_path}"
f"\nbackend: {backend}"
)
if not is_within_tol.all():
raise FrameTimestampError(
f"One or several query timestamps unexpectedly violate the tolerance ({min_[~is_within_tol]} > {tolerance_s=})."
" It means that the closest frame that can be loaded from the video is too far away in time."
" This might be due to synchronization issues with timestamps during data collection."
" To be safe, we advise to ignore this item during training."
f"\nqueried timestamps: {query_ts}"
f"\nloaded timestamps: {loaded_ts}"
f"\nvideo: {video_path}"
f"\nbackend: {backend}"
)
# get closest frames to the query timestamps
closest_frames = torch.stack([loaded_frames[idx] for idx in argmin_])
@@ -167,7 +249,11 @@ def decode_video_frames_torchvision(
# convert to the pytorch format which is float32 in [0,1] range (and channel first)
closest_frames = closest_frames.type(torch.float32) / 255
assert len(timestamps) == len(closest_frames)
if len(timestamps) != len(closest_frames):
raise FrameTimestampError(
f"Number of retrieved frames ({len(closest_frames)}) does not match "
f"number of queried timestamps ({len(timestamps)})"
)
return closest_frames
@@ -272,15 +358,16 @@ def decode_video_frames_torchcodec(
min_, argmin_ = dist.min(1)
is_within_tol = min_ < tolerance_s
assert is_within_tol.all(), (
f"One or several query timestamps unexpectedly violate the tolerance ({min_[~is_within_tol]} > {tolerance_s=})."
"It means that the closest frame that can be loaded from the video is too far away in time."
"This might be due to synchronization issues with timestamps during data collection."
"To be safe, we advise to ignore this item during training."
f"\nqueried timestamps: {query_ts}"
f"\nloaded timestamps: {loaded_ts}"
f"\nvideo: {video_path}"
)
if not is_within_tol.all():
raise FrameTimestampError(
f"One or several query timestamps unexpectedly violate the tolerance ({min_[~is_within_tol]} > {tolerance_s=})."
" It means that the closest frame that can be loaded from the video is too far away in time."
" This might be due to synchronization issues with timestamps during data collection."
" To be safe, we advise to ignore this item during training."
f"\nqueried timestamps: {query_ts}"
f"\nloaded timestamps: {loaded_ts}"
f"\nvideo: {video_path}"
)
# get closest frames to the query timestamps
closest_frames = torch.stack([loaded_frames[idx] for idx in argmin_])
@@ -309,14 +396,13 @@ def encode_video_frames(
g: int | None = 2,
crf: int | None = 30,
fast_decode: int = 0,
log_level: int | None = av.logging.ERROR,
log_level: int | None = av.logging.WARNING,
overwrite: bool = False,
preset: int | None = None,
encoder_threads: int | None = None,
) -> None:
"""More info on ffmpeg arguments tuning on `benchmark/video/README.md`"""
# Check encoder availability
if vcodec not in ["h264", "hevc", "libsvtav1"]:
raise ValueError(f"Unsupported video codec: {vcodec}. Supported codecs are: h264, hevc, libsvtav1.")
vcodec = resolve_vcodec(vcodec)
video_path = Path(video_path)
imgs_dir = Path(imgs_dir)
@@ -347,21 +433,22 @@ def encode_video_frames(
width, height = dummy_image.size
# Define video codec options
video_options = {}
if g is not None:
video_options["g"] = str(g)
if crf is not None:
video_options["crf"] = str(crf)
video_options = _get_codec_options(vcodec, g, crf, preset)
if fast_decode:
key = "svtav1-params" if vcodec == "libsvtav1" else "tune"
value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode"
video_options[key] = value
if vcodec == "libsvtav1":
video_options["preset"] = str(preset) if preset is not None else "12"
if encoder_threads is not None:
if vcodec == "libsvtav1":
lp_param = f"lp={encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(encoder_threads)
# Set logging level
if log_level is not None:
@@ -480,6 +567,348 @@ def concatenate_video_files(
Path(tmp_concatenate_path).unlink()
class _CameraEncoderThread(threading.Thread):
"""A thread that encodes video frames streamed via a queue into an MP4 file.
One instance is created per camera per episode. Frames are received as numpy arrays
from the main thread, encoded in real-time using PyAV (which releases the GIL during
encoding), and written to disk. Stats are computed incrementally using
RunningQuantileStats and returned via result_queue.
"""
def __init__(
self,
video_path: Path,
fps: int,
vcodec: str,
pix_fmt: str,
g: int | None,
crf: int | None,
preset: int | None,
frame_queue: queue.Queue,
result_queue: queue.Queue,
stop_event: threading.Event,
encoder_threads: int | None = None,
):
super().__init__(daemon=True)
self.video_path = video_path
self.fps = fps
self.vcodec = vcodec
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self.frame_queue = frame_queue
self.result_queue = result_queue
self.stop_event = stop_event
self.encoder_threads = encoder_threads
def run(self) -> None:
from lerobot.datasets.compute_stats import RunningQuantileStats, auto_downsample_height_width
container = None
output_stream = None
stats_tracker = RunningQuantileStats()
frame_count = 0
try:
logging.getLogger("libav").setLevel(av.logging.WARNING)
while True:
try:
frame_data = self.frame_queue.get(timeout=1)
except queue.Empty:
if self.stop_event.is_set():
break
continue
if frame_data is None:
# Sentinel: flush and close
break
# Ensure HWC uint8 numpy array
if isinstance(frame_data, np.ndarray):
if frame_data.ndim == 3 and frame_data.shape[0] == 3:
# CHW -> HWC
frame_data = frame_data.transpose(1, 2, 0)
if frame_data.dtype != np.uint8:
frame_data = (frame_data * 255).astype(np.uint8)
# Open container on first frame (to get width/height)
if container is None:
height, width = frame_data.shape[:2]
video_options = _get_codec_options(self.vcodec, self.g, self.crf, self.preset)
if self.encoder_threads is not None:
if self.vcodec == "libsvtav1":
lp_param = f"lp={self.encoder_threads}"
if "svtav1-params" in video_options:
video_options["svtav1-params"] += f":{lp_param}"
else:
video_options["svtav1-params"] = lp_param
else:
video_options["threads"] = str(self.encoder_threads)
Path(self.video_path).parent.mkdir(parents=True, exist_ok=True)
container = av.open(str(self.video_path), "w")
output_stream = container.add_stream(self.vcodec, self.fps, options=video_options)
output_stream.pix_fmt = self.pix_fmt
output_stream.width = width
output_stream.height = height
output_stream.time_base = Fraction(1, self.fps)
# Encode frame with explicit timestamps
pil_img = Image.fromarray(frame_data)
video_frame = av.VideoFrame.from_image(pil_img)
video_frame.pts = frame_count
video_frame.time_base = Fraction(1, self.fps)
packet = output_stream.encode(video_frame)
if packet:
container.mux(packet)
# Update stats with downsampled frame (per-channel stats like compute_episode_stats)
img_chw = frame_data.transpose(2, 0, 1) # HWC -> CHW
img_downsampled = auto_downsample_height_width(img_chw)
# Reshape CHW to (H*W, C) for per-channel stats
channels = img_downsampled.shape[0]
img_for_stats = img_downsampled.transpose(1, 2, 0).reshape(-1, channels)
stats_tracker.update(img_for_stats)
frame_count += 1
# Flush encoder
if output_stream is not None:
packet = output_stream.encode()
if packet:
container.mux(packet)
if container is not None:
container.close()
av.logging.restore_default_callback()
# Get stats and put on result queue
if frame_count >= 2:
stats = stats_tracker.get_statistics()
self.result_queue.put(("ok", stats))
else:
self.result_queue.put(("ok", None))
except Exception as e:
logging.error(f"Encoder thread error: {e}")
if container is not None:
with contextlib.suppress(Exception):
container.close()
self.result_queue.put(("error", str(e)))
class StreamingVideoEncoder:
"""Manages per-camera encoder threads for real-time video encoding during recording.
Instead of writing frames as PNG images and then encoding to MP4 at episode end,
this class streams frames directly to encoder threads, eliminating the
PNG round-trip and making save_episode() near-instant.
Uses threading instead of multiprocessing to avoid the overhead of pickling large
numpy arrays through multiprocessing.Queue. PyAV's encode() releases the GIL,
so encoding runs in parallel with the main recording loop.
"""
def __init__(
self,
fps: int,
vcodec: str = "libsvtav1",
pix_fmt: str = "yuv420p",
g: int | None = 2,
crf: int | None = 30,
preset: int | None = None,
queue_maxsize: int = 30,
encoder_threads: int | None = None,
):
self.fps = fps
self.vcodec = resolve_vcodec(vcodec)
self.pix_fmt = pix_fmt
self.g = g
self.crf = crf
self.preset = preset
self.queue_maxsize = queue_maxsize
self.encoder_threads = encoder_threads
self._frame_queues: dict[str, queue.Queue] = {}
self._result_queues: dict[str, queue.Queue] = {}
self._threads: dict[str, _CameraEncoderThread] = {}
self._stop_events: dict[str, threading.Event] = {}
self._video_paths: dict[str, Path] = {}
self._dropped_frames: dict[str, int] = {}
self._episode_active = False
def start_episode(self, video_keys: list[str], temp_dir: Path) -> None:
"""Start encoder threads for a new episode.
Args:
video_keys: List of video feature keys (e.g. ["observation.images.laptop"])
temp_dir: Base directory for temporary MP4 files
"""
if self._episode_active:
self.cancel_episode()
self._dropped_frames.clear()
for video_key in video_keys:
frame_queue: queue.Queue = queue.Queue(maxsize=self.queue_maxsize)
result_queue: queue.Queue = queue.Queue(maxsize=1)
stop_event = threading.Event()
temp_video_dir = Path(tempfile.mkdtemp(dir=temp_dir))
video_path = temp_video_dir / f"{video_key.replace('/', '_')}_streaming.mp4"
encoder_thread = _CameraEncoderThread(
video_path=video_path,
fps=self.fps,
vcodec=self.vcodec,
pix_fmt=self.pix_fmt,
g=self.g,
crf=self.crf,
preset=self.preset,
frame_queue=frame_queue,
result_queue=result_queue,
stop_event=stop_event,
encoder_threads=self.encoder_threads,
)
encoder_thread.start()
self._frame_queues[video_key] = frame_queue
self._result_queues[video_key] = result_queue
self._threads[video_key] = encoder_thread
self._stop_events[video_key] = stop_event
self._video_paths[video_key] = video_path
self._episode_active = True
def feed_frame(self, video_key: str, image: np.ndarray) -> None:
"""Feed a frame to the encoder for a specific camera.
A copy of the image is made before enqueueing to prevent race conditions
with camera drivers that may reuse buffers. If the encoder queue is full
(encoder can't keep up), the frame is dropped with a warning instead of
crashing the recording session.
Args:
video_key: The video feature key
image: numpy array in (H,W,C) or (C,H,W) format, uint8 or float
Raises:
RuntimeError: If the encoder thread has crashed
"""
if not self._episode_active:
raise RuntimeError("No active episode. Call start_episode() first.")
thread = self._threads[video_key]
if not thread.is_alive():
# Check for error
try:
status, msg = self._result_queues[video_key].get_nowait()
if status == "error":
raise RuntimeError(f"Encoder thread for {video_key} crashed: {msg}")
except queue.Empty:
pass
raise RuntimeError(f"Encoder thread for {video_key} is not alive")
try:
self._frame_queues[video_key].put(image.copy(), timeout=0.1)
except queue.Full:
self._dropped_frames[video_key] = self._dropped_frames.get(video_key, 0) + 1
count = self._dropped_frames[video_key]
# Log periodically to avoid spam (1st, then every 10th)
if count == 1 or count % 10 == 0:
logging.warning(
f"Encoder queue full for {video_key}, dropped {count} frame(s). "
f"Consider using vcodec='auto' for hardware encoding or increasing encoder_queue_maxsize."
)
def finish_episode(self) -> dict[str, tuple[Path, dict | None]]:
"""Finish encoding the current episode.
Sends sentinel values, waits for encoder threads to complete,
and collects results.
Returns:
Dict mapping video_key to (mp4_path, stats_dict_or_None)
"""
if not self._episode_active:
raise RuntimeError("No active episode to finish.")
results = {}
# Report dropped frames
for video_key, count in self._dropped_frames.items():
if count > 0:
logging.warning(f"Episode finished with {count} dropped frame(s) for {video_key}.")
# Send sentinel to all queues
for video_key in self._frame_queues:
self._frame_queues[video_key].put(None)
# Wait for all threads and collect results
for video_key in self._threads:
self._threads[video_key].join(timeout=120)
if self._threads[video_key].is_alive():
logging.error(f"Encoder thread for {video_key} did not finish in time")
self._stop_events[video_key].set()
self._threads[video_key].join(timeout=5)
results[video_key] = (self._video_paths[video_key], None)
continue
try:
status, data = self._result_queues[video_key].get(timeout=5)
if status == "error":
raise RuntimeError(f"Encoder thread for {video_key} failed: {data}")
results[video_key] = (self._video_paths[video_key], data)
except queue.Empty:
logging.error(f"No result from encoder thread for {video_key}")
results[video_key] = (self._video_paths[video_key], None)
self._cleanup()
self._episode_active = False
return results
def cancel_episode(self) -> None:
"""Cancel the current episode, stopping encoder threads and cleaning up."""
if not self._episode_active:
return
# Signal all threads to stop
for video_key in self._stop_events:
self._stop_events[video_key].set()
# Wait for threads to finish
for video_key in self._threads:
self._threads[video_key].join(timeout=5)
# Clean up temp MP4 files
video_path = self._video_paths.get(video_key)
if video_path is not None and video_path.exists():
shutil.rmtree(str(video_path.parent), ignore_errors=True)
self._cleanup()
self._episode_active = False
def close(self) -> None:
"""Close the encoder, canceling any in-progress episode."""
if self._episode_active:
self.cancel_episode()
def _cleanup(self) -> None:
"""Clean up queues and thread tracking dicts."""
for q in self._frame_queues.values():
with contextlib.suppress(Exception):
while not q.empty():
q.get_nowait()
self._frame_queues.clear()
self._result_queues.clear()
self._threads.clear()
self._stop_events.clear()
self._video_paths.clear()
@dataclass
class VideoFrame:
# TODO(rcadene, lhoestq): move to Hugging Face `datasets` repo
@@ -514,7 +943,7 @@ with warnings.catch_warnings():
def get_audio_info(video_path: Path | str) -> dict:
# Set logging level
logging.getLogger("libav").setLevel(av.logging.ERROR)
logging.getLogger("libav").setLevel(av.logging.WARNING)
# Getting audio stream information
audio_info = {}
@@ -546,7 +975,7 @@ def get_audio_info(video_path: Path | str) -> dict:
def get_video_info(video_path: Path | str) -> dict:
# Set logging level
logging.getLogger("libav").setLevel(av.logging.ERROR)
logging.getLogger("libav").setLevel(av.logging.WARNING)
# Getting video stream information
video_info = {}
@@ -632,8 +1061,15 @@ class VideoEncodingManager:
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Handle any remaining episodes that haven't been batch encoded
if self.dataset.episodes_since_last_encoding > 0:
streaming_encoder = getattr(self.dataset, "_streaming_encoder", None)
if streaming_encoder is not None:
# Handle streaming encoder cleanup
if exc_type is not None:
streaming_encoder.cancel_episode()
streaming_encoder.close()
elif self.dataset.episodes_since_last_encoding > 0:
# Handle any remaining episodes that haven't been batch encoded
if exc_type is not None:
logging.info("Exception occurred. Encoding remaining episodes before exit...")
else:
@@ -650,8 +1086,8 @@ class VideoEncodingManager:
# Finalize the dataset to properly close all writers
self.dataset.finalize()
# Clean up episode images if recording was interrupted
if exc_type is not None:
# Clean up episode images if recording was interrupted (only for non-streaming mode)
if exc_type is not None and streaming_encoder is None:
interrupted_episode_index = self.dataset.num_episodes
for key in self.dataset.meta.video_keys:
img_dir = self.dataset._get_image_file_path(
@@ -665,14 +1101,12 @@ class VideoEncodingManager:
# Clean up any remaining images directory if it's empty
img_dir = self.dataset.root / "images"
# Check for any remaining PNG files
png_files = list(img_dir.rglob("*.png"))
if len(png_files) == 0:
# Only remove the images directory if no PNG files remain
if img_dir.exists():
if img_dir.exists():
png_files = list(img_dir.rglob("*.png"))
if len(png_files) == 0:
shutil.rmtree(img_dir)
logging.debug("Cleaned up empty images directory")
else:
logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files")
else:
logging.debug(f"Images directory is not empty, containing {len(png_files)} PNG files")
return False # Don't suppress the original exception

View File

@@ -0,0 +1,18 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .robstride import RobstrideMotorsBus
from .tables import *

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,120 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Configuration tables for Damiao motors."""
from enum import IntEnum
# Motor type definitions
class MotorType(IntEnum):
O0 = 0
O1 = 1
O2 = 2
O3 = 3
O4 = 4
O5 = 5
ELO5 = 6
O6 = 7
class CommMode(IntEnum):
PrivateProtocole = 0
CANopen = 1
MIT = 2
# Control modes
class ControlMode(IntEnum):
MIT = 0
POS_VEL = 1
VEL = 2
# Motor limit parameters [PMAX, VMAX, TMAX]
# PMAX: Maximum position (rad)
# VMAX: Maximum velocity (rad/s)
# TMAX: Maximum torque (N·m)
MOTOR_LIMIT_PARAMS: dict[MotorType, tuple[float, float, float]] = {
MotorType.O0: (12.57, 33, 14),
MotorType.O1: (12.57, 44, 17),
MotorType.O2: (12.57, 33, 20),
MotorType.O3: (12.57, 33, 60),
MotorType.O4: (12.57, 33, 120),
MotorType.O5: (12.57, 50, 5.5),
MotorType.ELO5: (12.57, 50, 6),
MotorType.O6: (112.5, 50, 36),
}
# Motor model names
MODEL_NAMES = {
MotorType.O0: "O0",
MotorType.O1: "O1",
MotorType.O2: "O2",
MotorType.O3: "O3",
MotorType.O4: "O4",
MotorType.O5: "O5",
MotorType.ELO5: "ELO5",
MotorType.O6: "O6",
}
# Motor resolution table (encoder counts per revolution)
MODEL_RESOLUTION = {
"O0": 65536,
"O1": 65536,
"O2": 65536,
"O3": 65536,
"O4": 65536,
"O5": 65536,
"ELO5": 65536,
"O6": 65536,
}
# CAN baudrates supported by Robstride motors
AVAILABLE_BAUDRATES = [
1000000, # 4: 1 mbps (default)
]
DEFAULT_BAUDRATE = 1000000
# Default timeout in milliseconds
DEFAULT_TIMEOUT_MS = 0 # disabled by default, otherwise 20000 is 1s
# Data that should be normalized
NORMALIZED_DATA = ["Present_Position", "Goal_Position"]
# MIT control parameter ranges
MIT_KP_RANGE = (0.0, 500.0)
MIT_KD_RANGE = (0.0, 5.0)
# CAN frame command IDs
CAN_CMD_ENABLE = 0xFC
CAN_CMD_DISABLE = 0xFD
CAN_CMD_SET_ZERO = 0xFE
CAN_CMD_CLEAR_FAULT = 0xFB
CAN_CMD_QUERY_PARAM = 0x33
CAN_CMD_WRITE_PARAM = 0x55
CAN_CMD_SAVE_PARAM = 0xAA
# CAN ID for parameter operations
CAN_PARAM_ID = 0x7FF
RUNNING_TIMEOUT = 0.001
PARAM_TIMEOUT = 0.01
STATE_CACHE_TTL_S = 0.02

View File

@@ -55,10 +55,16 @@ class DiffusionConfig(PreTrainedConfig):
normalization_mapping: A dictionary that maps from a str value of FeatureType (e.g., "STATE", "VISUAL") to
a corresponding NormalizationMode (e.g., NormalizationMode.MIN_MAX)
vision_backbone: Name of the torchvision resnet backbone to use for encoding images.
crop_shape: (H, W) shape to crop images to as a preprocessing step for the vision backbone. Must fit
within the image size. If None, no cropping is done.
crop_is_random: Whether the crop should be random at training time (it's always a center crop in eval
mode).
resize_shape: (H, W) shape to resize images to as a preprocessing step for the vision
backbone. If None, no resizing is done and the original image resolution is used.
crop_ratio: Ratio in (0, 1] used to derive the crop size from resize_shape
(crop_h = int(resize_shape[0] * crop_ratio), likewise for width).
Set to 1.0 to disable cropping. Only takes effect when resize_shape is not None.
crop_shape: (H, W) shape to crop images to. When resize_shape is set and crop_ratio < 1.0,
this is computed automatically. Can also be set directly for legacy configs that use
crop-only (without resize). If None and no derivation applies, no cropping is done.
crop_is_random: Whether the crop should be random at training time (it's always a center
crop in eval mode).
pretrained_backbone_weights: Pretrained weights from torchvision to initialize the backbone.
`None` means no pretrained weights.
use_group_norm: Whether to replace batch normalization with group normalization in the backbone.
@@ -114,7 +120,9 @@ class DiffusionConfig(PreTrainedConfig):
# Architecture / modeling.
# Vision backbone.
vision_backbone: str = "resnet18"
crop_shape: tuple[int, int] | None = (84, 84)
resize_shape: tuple[int, int] | None = None
crop_ratio: float = 1.0
crop_shape: tuple[int, int] | None = None
crop_is_random: bool = True
pretrained_backbone_weights: str | None = None
use_group_norm: bool = True
@@ -139,6 +147,10 @@ class DiffusionConfig(PreTrainedConfig):
# Inference
num_inference_steps: int | None = None
# Optimization
compile_model: bool = False
compile_mode: str = "reduce-overhead"
# Loss computation
do_mask_loss_for_padding: bool = False
@@ -171,6 +183,25 @@ class DiffusionConfig(PreTrainedConfig):
f"Got {self.noise_scheduler_type}."
)
if self.resize_shape is not None and (
len(self.resize_shape) != 2 or any(d <= 0 for d in self.resize_shape)
):
raise ValueError(f"`resize_shape` must be a pair of positive integers. Got {self.resize_shape}.")
if not (0 < self.crop_ratio <= 1.0):
raise ValueError(f"`crop_ratio` must be in (0, 1]. Got {self.crop_ratio}.")
if self.resize_shape is not None:
if self.crop_ratio < 1.0:
self.crop_shape = (
int(self.resize_shape[0] * self.crop_ratio),
int(self.resize_shape[1] * self.crop_ratio),
)
else:
# Explicitly disable cropping for resize+ratio path when crop_ratio == 1.0.
self.crop_shape = None
if self.crop_shape is not None and (self.crop_shape[0] <= 0 or self.crop_shape[1] <= 0):
raise ValueError(f"`crop_shape` must have positive dimensions. Got {self.crop_shape}.")
# Check that the horizon size and U-Net downsampling is compatible.
# U-Net downsamples by 2 with each stage.
downsampling_factor = 2 ** len(self.down_dims)
@@ -198,13 +229,12 @@ class DiffusionConfig(PreTrainedConfig):
if len(self.image_features) == 0 and self.env_state_feature is None:
raise ValueError("You must provide at least one image or the environment state among the inputs.")
if self.crop_shape is not None:
if self.resize_shape is None and self.crop_shape is not None:
for key, image_ft in self.image_features.items():
if self.crop_shape[0] > image_ft.shape[1] or self.crop_shape[1] > image_ft.shape[2]:
raise ValueError(
f"`crop_shape` should fit within the images shapes. Got {self.crop_shape} "
f"for `crop_shape` and {image_ft.shape} for "
f"`{key}`."
f"`crop_shape` should fit within the image shapes. Got {self.crop_shape} "
f"for `crop_shape` and {image_ft.shape} for `{key}`."
)
# Check that all input images have the same shape.

View File

@@ -142,6 +142,9 @@ class DiffusionPolicy(PreTrainedPolicy):
"""Run the batch through the model and compute the loss for training or validation."""
if self.config.image_features:
batch = dict(batch) # shallow copy so that adding a key doesn't modify the original
for key in self.config.image_features:
if self.config.n_obs_steps == 1 and batch[key].ndim == 4:
batch[key] = batch[key].unsqueeze(1)
batch[OBS_IMAGES] = torch.stack([batch[key] for key in self.config.image_features], dim=-4)
loss = self.diffusion.compute_loss(batch)
# no output_dict so returning None
@@ -182,6 +185,11 @@ class DiffusionModel(nn.Module):
self.unet = DiffusionConditionalUnet1d(config, global_cond_dim=global_cond_dim * config.n_obs_steps)
if config.compile_model:
# Compile the U-Net. "reduce-overhead" is preferred for the small-batch repetitive loops
# common in diffusion inference.
self.unet = torch.compile(self.unet, mode=config.compile_mode)
self.noise_scheduler = _make_noise_scheduler(
config.noise_scheduler_type,
num_train_timesteps=config.num_train_timesteps,
@@ -446,12 +454,18 @@ class DiffusionRgbEncoder(nn.Module):
def __init__(self, config: DiffusionConfig):
super().__init__()
# Set up optional preprocessing.
if config.crop_shape is not None:
if config.resize_shape is not None:
self.resize = torchvision.transforms.Resize(config.resize_shape)
else:
self.resize = None
crop_shape = config.crop_shape
if crop_shape is not None:
self.do_crop = True
# Always use center crop for eval
self.center_crop = torchvision.transforms.CenterCrop(config.crop_shape)
self.center_crop = torchvision.transforms.CenterCrop(crop_shape)
if config.crop_is_random:
self.maybe_random_crop = torchvision.transforms.RandomCrop(config.crop_shape)
self.maybe_random_crop = torchvision.transforms.RandomCrop(crop_shape)
else:
self.maybe_random_crop = self.center_crop
else:
@@ -477,13 +491,16 @@ class DiffusionRgbEncoder(nn.Module):
# Set up pooling and final layers.
# Use a dry run to get the feature map shape.
# The dummy input should take the number of image channels from `config.image_features` and it should
# use the height and width from `config.crop_shape` if it is provided, otherwise it should use the
# height and width from `config.image_features`.
# The dummy shape mirrors the runtime preprocessing order: resize -> crop.
# Note: we have a check in the config class to make sure all images have the same shape.
images_shape = next(iter(config.image_features.values())).shape
dummy_shape_h_w = config.crop_shape if config.crop_shape is not None else images_shape[1:]
if config.crop_shape is not None:
dummy_shape_h_w = config.crop_shape
elif config.resize_shape is not None:
dummy_shape_h_w = config.resize_shape
else:
dummy_shape_h_w = images_shape[1:]
dummy_shape = (1, images_shape[0], *dummy_shape_h_w)
feature_map_shape = get_output_shape(self.backbone, dummy_shape)[1:]
@@ -499,7 +516,10 @@ class DiffusionRgbEncoder(nn.Module):
Returns:
(B, D) image feature.
"""
# Preprocess: maybe crop (if it was set up in the __init__).
# Preprocess: resize if configured, then crop if configured.
if self.resize is not None:
x = self.resize(x)
if self.do_crop:
if self.training: # noqa: SIM108
x = self.maybe_random_crop(x)

View File

@@ -277,9 +277,7 @@ class SARMEncodingProcessorStep(ProcessorStep):
# When language is perturbed, targets are zero so perturbed samples don't contribute to progress loss
if self.dataset_meta is not None:
episodes_df = None
if self.sparse_subtask_names != ["task"]:
episodes_df = self.dataset_meta.episodes.to_pandas()
episodes_df = self.dataset_meta.episodes.to_pandas()
# Generate sparse targets
if self.sparse_temporal_proportions is not None:

View File

@@ -85,7 +85,7 @@ class SmolVLAConfig(PreTrainedConfig):
scheduler_decay_lr: float = 2.5e-6
vlm_model_name: str = "HuggingFaceTB/SmolVLM2-500M-Video-Instruct" # Select the VLM backbone.
load_vlm_weights: bool = False # Set to True in case of training the expert from scratch. True when init from pretrained SmolVLA weights
load_vlm_weights: bool = False # Set to False in case of training the expert from scratch. True when init from pretrained SmolVLA weights
add_image_special_tokens: bool = False # Whether to use special image tokens around image features.
@@ -106,6 +106,9 @@ class SmolVLAConfig(PreTrainedConfig):
# Real-Time Chunking (RTC) configuration
rtc_config: RTCConfig | None = None
compile_model: bool = False # Whether to use torch.compile for model optimization
compile_mode: str = "max-autotune" # Torch compile mode
def __post_init__(self):
super().__post_init__()

View File

@@ -593,6 +593,12 @@ class VLAFlowMatching(nn.Module):
self.prefix_length = self.config.prefix_length
self.rtc_processor = rtc_processor
# Compile model if requested
if config.compile_model:
torch.set_float32_matmul_precision("high")
self.sample_actions = torch.compile(self.sample_actions, mode=config.compile_mode)
self.forward = torch.compile(self.forward, mode=config.compile_mode)
def _rtc_enabled(self):
return self.config.rtc_config is not None and self.config.rtc_config.enabled

View File

@@ -77,7 +77,6 @@ class SmolVLMWithExpertModel(nn.Module):
print(f"Loading {model_id} weights ...")
self.vlm = AutoModelForImageTextToText.from_pretrained(
model_id,
device_map=device,
torch_dtype="bfloat16",
low_cpu_mem_usage=True,
)

View File

@@ -30,12 +30,6 @@ from .core import (
)
from .delta_action_processor import MapDeltaActionToRobotActionStep, MapTensorToDeltaActionDictStep
from .device_processor import DeviceProcessorStep
from .factory import (
make_default_processors,
make_default_robot_action_processor,
make_default_robot_observation_processor,
make_default_teleop_action_processor,
)
from .gym_action_processor import (
Numpy2TorchActionProcessorStep,
Torch2NumpyActionProcessorStep,
@@ -95,11 +89,7 @@ __all__ = [
"ImageCropResizeProcessorStep",
"InfoProcessorStep",
"InterventionActionProcessorStep",
"make_default_processors",
"make_default_teleop_action_processor",
"make_default_robot_action_processor",
"make_default_robot_observation_processor",
"MapDeltaActionToRobotActionStep",
"MapDeltaActionToRobotActionStep",
"MapTensorToDeltaActionDictStep",
"NormalizerProcessorStep",
"Numpy2TorchActionProcessorStep",

View File

@@ -17,6 +17,7 @@
from .converters import (
observation_to_transition,
robot_action_observation_to_transition,
robot_action_to_transition,
transition_to_observation,
transition_to_robot_action,
)
@@ -24,39 +25,44 @@ from .core import RobotAction, RobotObservation
from .pipeline import IdentityProcessorStep, RobotProcessorPipeline
def make_default_teleop_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
teleop_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return teleop_action_processor
# ── Internal identity pipeline helpers (used by Robot/Teleoperator base classes) ──────────────────
def make_default_robot_action_processor() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
robot_action_processor = RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
return robot_action_processor
def make_default_robot_observation_processor() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
robot_observation_processor = RobotProcessorPipeline[RobotObservation, RobotObservation](
def _make_identity_observation_pipeline() -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
"""Identity pipeline for robot observations (get_observation output pipeline)."""
return RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
def _make_identity_robot_action_pipeline() -> RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
]:
"""Identity pipeline for robot action input (send_action input pipeline, takes (action, obs) tuple)."""
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)
def _make_identity_teleop_action_pipeline() -> RobotProcessorPipeline[RobotAction, RobotAction]:
"""Identity pipeline for teleop action output (get_action output pipeline, takes just action)."""
return RobotProcessorPipeline[RobotAction, RobotAction](
steps=[IdentityProcessorStep()],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)
def _make_identity_feedback_pipeline() -> RobotProcessorPipeline[dict, dict]:
"""Identity pipeline for teleop feedback input (send_feedback input pipeline)."""
return RobotProcessorPipeline[dict, dict](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
return robot_observation_processor
def make_default_processors():
teleop_action_processor = make_default_teleop_action_processor()
robot_action_processor = make_default_robot_action_processor()
robot_observation_processor = make_default_robot_observation_processor()
return (teleop_action_processor, robot_action_processor, robot_observation_processor)

View File

@@ -19,15 +19,17 @@ from __future__ import annotations
from copy import deepcopy
from dataclasses import dataclass, field
from typing import Any
from typing import TYPE_CHECKING, Any
import torch
from torch import Tensor
from lerobot.configs.types import FeatureType, NormalizationMode, PipelineFeatureType, PolicyFeature
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.utils.constants import ACTION
if TYPE_CHECKING:
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from .converters import from_tensor_to_numpy, to_tensor
from .core import EnvTransition, PolicyAction, TransitionKey
from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry, RobotObservation

View File

@@ -43,12 +43,9 @@ from lerobot.utils.import_utils import _transformers_available
from .core import EnvTransition, RobotObservation, TransitionKey
from .pipeline import ActionProcessorStep, ObservationProcessorStep, ProcessorStepRegistry
# Conditional import for type checking and lazy loading
if TYPE_CHECKING or _transformers_available:
# Type-checking only import — do NOT import transformers at module level (it loads TF which blocks)
if TYPE_CHECKING:
from transformers import AutoProcessor, AutoTokenizer
else:
AutoProcessor = None
AutoTokenizer = None
@dataclass
@@ -106,8 +103,7 @@ class TokenizerProcessorStep(ObservationProcessorStep):
# Use provided tokenizer object directly
self.input_tokenizer = self.tokenizer
elif self.tokenizer_name is not None:
if AutoTokenizer is None:
raise ImportError("AutoTokenizer is not available")
from transformers import AutoTokenizer # lazy import to avoid TF deadlock at module load
self.input_tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name)
else:
raise ValueError(
@@ -370,12 +366,12 @@ class ActionTokenizerProcessorStep(ActionProcessorStep):
"Please install it with `pip install 'lerobot[transformers-dep]'` to use ActionTokenizerProcessorStep."
)
from transformers import AutoProcessor, AutoTokenizer # lazy import to avoid TF deadlock at module load
if self.action_tokenizer_input_object is not None:
self.action_tokenizer = self.action_tokenizer_input_object
elif self.action_tokenizer_name is not None:
if AutoProcessor is None:
raise ImportError("AutoProcessor is not available")
self.action_tokenizer = AutoProcessor.from_pretrained(
self.action_tokenizer_name, trust_remote_code=self.trust_remote_code
)

View File

@@ -102,11 +102,11 @@ class BiOpenArmFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -136,7 +136,7 @@ class BiOpenArmFollower(Robot):
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
@@ -150,7 +150,7 @@ class BiOpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
def _send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,

View File

@@ -86,11 +86,11 @@ class BiSOFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -119,7 +119,7 @@ class BiSOFollower(Robot):
self.right_arm.setup_motors()
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Add "left_" prefix
@@ -133,7 +133,7 @@ class BiSOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
# Remove "left_" prefix
left_action = {
key.removeprefix("left_"): value for key, value in action.items() if key.startswith("left_")

View File

@@ -147,7 +147,7 @@ class EarthRoverMiniPlus(Robot):
pass
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
"""Define the observation space for dataset recording.
Returns:
@@ -184,7 +184,7 @@ class EarthRoverMiniPlus(Robot):
}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Define the action space.
Returns:
@@ -198,7 +198,7 @@ class EarthRoverMiniPlus(Robot):
}
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""Get current robot observation from SDK.
Returns:
@@ -255,7 +255,7 @@ class EarthRoverMiniPlus(Robot):
return observation
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Send action to robot via SDK.
Args:

View File

@@ -71,11 +71,11 @@ class HopeJrArm(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -128,7 +128,7 @@ class HopeJrArm(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position", self.other_motors)
@@ -147,7 +147,7 @@ class HopeJrArm(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
# Cap goal position when too far away from present position.

View File

@@ -107,11 +107,11 @@ class HopeJrHand(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -158,7 +158,7 @@ class HopeJrHand(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict = {}
# Read hand position
@@ -178,7 +178,7 @@ class HopeJrHand(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
goal_pos = {key.removesuffix(".pos"): val for key, val in action.items() if key.endswith(".pos")}
self.bus.sync_write("Goal_Position", goal_pos)
return action

View File

@@ -73,11 +73,11 @@ class KochFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -182,7 +182,7 @@ class KochFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -200,7 +200,7 @@ class KochFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter

View File

@@ -98,11 +98,11 @@ class LeKiwi(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._state_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._state_ft
@property
@@ -338,7 +338,7 @@ class LeKiwi(Robot):
} # m/s and deg/s
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read actuators position for arm and vel for base
start = time.perf_counter()
arm_pos = self.bus.sync_read("Present_Position", self.arm_motors)
@@ -367,7 +367,7 @@ class LeKiwi(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command lekiwi to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter

View File

@@ -98,11 +98,11 @@ class LeKiwiClient(Robot):
return {name: (cfg.height, cfg.width, 3) for name, cfg in self.config.cameras.items()}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._state_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._state_ft
@property
@@ -250,7 +250,7 @@ class LeKiwiClient(Robot):
return new_frames, new_state
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""
Capture observations from the remote robot: current follower arm positions,
present wheel speeds (converted to body-frame velocities: x, y, theta),
@@ -304,7 +304,7 @@ class LeKiwiClient(Robot):
pass
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command lekiwi to move to a target joint configuration. Translates to motor space + sends over ZMQ
Args:

View File

@@ -73,11 +73,11 @@ class OmxFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -165,7 +165,7 @@ class OmxFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -183,7 +183,7 @@ class OmxFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter

View File

@@ -105,12 +105,12 @@ class OpenArmFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
"""Combined observation features from motors and cameras."""
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Action features."""
return self._motors_ft
@@ -219,7 +219,7 @@ class OpenArmFollower(Robot):
)
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
"""
Get current observation from robot including position, velocity, and torque.
@@ -251,7 +251,7 @@ class OpenArmFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(
def _send_action(
self,
action: RobotAction,
custom_kp: dict[str, float] | None = None,

View File

@@ -95,11 +95,11 @@ class Reachy2Robot(Robot):
self.joints_dict: dict[str, str] = self._generate_joints_dict()
@property
def observation_features(self) -> dict[str, Any]:
def raw_observation_features(self) -> dict[str, Any]:
return {**self.motors_features, **self.camera_features}
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self.motors_features
@property
@@ -170,7 +170,7 @@ class Reachy2Robot(Robot):
else:
return {}
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
obs_dict: RobotObservation = {}
# Read Reachy 2 state
@@ -184,7 +184,7 @@ class Reachy2Robot(Robot):
return obs_dict
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
if self.reachy is not None:
if not self.is_connected:
raise ConnectionError()

View File

@@ -18,8 +18,11 @@ from pathlib import Path
import draccus
from lerobot.configs.types import PipelineFeatureType
from lerobot.motors import MotorCalibration
from lerobot.processor import RobotAction, RobotObservation
from lerobot.processor.core import RobotAction, RobotObservation
from lerobot.processor.factory import _make_identity_observation_pipeline, _make_identity_robot_action_pipeline
from lerobot.processor.pipeline import RobotProcessorPipeline
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, ROBOTS
from .config import RobotConfig
@@ -34,6 +37,10 @@ class Robot(abc.ABC):
This class provides a standardized interface for interacting with physical robots.
Subclasses must implement all abstract methods and properties to be usable.
Pipelines are first-class citizens: every robot carries an optional output pipeline
(applied in get_observation()) and an optional input pipeline (applied in send_action()).
Both default to identity (no-op), so existing robots work without any changes.
Attributes:
config_class (RobotConfig): The expected configuration class for this robot.
name (str): The unique robot name used to identify this robot type.
@@ -55,6 +62,12 @@ class Robot(abc.ABC):
if self.calibration_fpath.is_file():
self._load_calibration()
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
self._output_pipeline: RobotProcessorPipeline = _make_identity_observation_pipeline()
self._input_pipeline: RobotProcessorPipeline = _make_identity_robot_action_pipeline()
# Cache of most recent raw observation; used by input_pipeline for IK initial guess
self._last_raw_obs: RobotObservation = {}
def __str__(self) -> str:
return f"{self.id} {self.__class__.__name__}"
@@ -84,40 +97,117 @@ class Robot(abc.ABC):
except Exception: # nosec B110
pass
# TODO(aliberts): create a proper Feature class for this that links with datasets
# ── Pipeline interface ────────────────────────────────────────────────────
def output_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside get_observation() to transform raw hardware observations.
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
Example: set a forward-kinematics pipeline to convert joint positions to EE pose.
"""
return self._output_pipeline
def input_pipeline(self) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
"""
Pipeline applied inside send_action() to transform incoming actions before hardware write.
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
The pipeline receives a (action, last_raw_obs) tuple so IK solvers can use the
current joint configuration as an initial guess.
Example: set an inverse-kinematics pipeline to convert EE commands to joint positions.
"""
return self._input_pipeline
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the observation output pipeline (applied in get_observation())."""
self._output_pipeline = pipeline
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the action input pipeline (applied in send_action())."""
self._input_pipeline = pipeline
# ── Feature properties ────────────────────────────────────────────────────
@property
@abc.abstractmethod
def observation_features(self) -> dict:
"""
A dictionary describing the structure and types of the observations produced by the robot.
Its structure (keys) should match the structure of what is returned by :pymeth:`get_observation`.
Values for the dict should either be:
- The type of the value if it's a simple value, e.g. `float` for single proprioceptive value (a joint's position/velocity)
- A tuple representing the shape if it's an array-type value, e.g. `(height, width, channel)` for images
Pipeline-transformed observation features.
Note: this property should be able to be called regardless of whether the robot is connected or not.
Applies output_pipeline().transform_features() to raw_observation_features so the
returned dict matches what get_observation() actually returns to callers.
Use raw_observation_features to inspect hardware-level feature shapes.
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(observation=self.raw_observation_features)
transformed = self.output_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.OBSERVATION, {})
@property
@abc.abstractmethod
def raw_observation_features(self) -> dict:
"""
Hardware-level observation features (before any pipeline transformation).
A dictionary describing the structure and types of the observations produced
directly by the robot hardware. Its structure (keys) should match the structure
of what is returned by :pymeth:`_get_observation`. Values should be:
- The type if it's a simple value, e.g. ``float`` for joint position
- A tuple representing the shape for array values, e.g. ``(H, W, C)`` for images
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
pass
@property
@abc.abstractmethod
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
"""
A dictionary describing the structure and types of the actions expected by the robot. Its structure
(keys) should match the structure of what is passed to :pymeth:`send_action`. Values for the dict
should be the type of the value if it's a simple value, e.g. `float` for single proprioceptive value
(a joint's goal position/velocity)
Hardware-level action features (before any pipeline transformation).
Note: this property should be able to be called regardless of whether the robot is connected or not.
A dictionary describing the structure and types of the actions accepted directly
by the robot hardware (i.e. what :pymeth:`_send_action` receives). Its structure
(keys) should match the structure of what is expected by :pymeth:`_send_action`.
Values should be the type of the value if it's a simple value, e.g. ``float`` for
single proprioceptive value (a joint's goal position/velocity).
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
pass
@property
def action_features(self) -> dict:
"""
Pipeline-transformed action features.
Applies input_pipeline().transform_features() to raw_action_features so the
returned dict reflects what the input pipeline outputs to hardware.
Use raw_action_features to inspect hardware-level action feature shapes.
Note: this property should be able to be called regardless of whether the robot
is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(action=self.raw_action_features)
transformed = self.input_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.ACTION, {})
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""
Whether the robot is currently connected or not. If `False`, calling :pymeth:`get_observation` or
:pymeth:`send_action` should raise an error.
Whether the robot is currently connected or not. If ``False``, calling
:pymeth:`get_observation` or :pymeth:`send_action` should raise an error.
"""
pass
@@ -135,7 +225,7 @@ class Robot(abc.ABC):
@property
@abc.abstractmethod
def is_calibrated(self) -> bool:
"""Whether the robot is currently calibrated or not. Should be always `True` if not applicable"""
"""Whether the robot is currently calibrated or not. Should be always ``True`` if not applicable"""
pass
@abc.abstractmethod
@@ -153,7 +243,7 @@ class Robot(abc.ABC):
Helper to load calibration data from the specified file.
Args:
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath) as f, draccus.config_type("json"):
@@ -164,7 +254,7 @@ class Robot(abc.ABC):
Helper to save calibration data to the specified file.
Args:
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath, "w") as f, draccus.config_type("json"):
@@ -178,30 +268,64 @@ class Robot(abc.ABC):
"""
pass
@abc.abstractmethod
# ── Template methods (concrete, call pipeline internally) ─────────────────
def get_observation(self) -> RobotObservation:
"""
Retrieve the current observation from the robot.
Retrieve the current observation from the robot and apply the output pipeline.
Calls :pymeth:`_get_observation` to get raw hardware data, caches it for use as
IK initial guess in :pymeth:`send_action`, then applies :pymeth:`output_pipeline`.
Returns:
RobotObservation: A flat dictionary representing the robot's current sensory state. Its structure
should match :pymeth:`observation_features`.
RobotObservation: Pipeline-transformed observation. With the default identity
pipeline this equals the raw observation from :pymeth:`_get_observation`.
"""
pass
raw = self._get_observation()
self._last_raw_obs = raw
return self.output_pipeline()(raw)
@abc.abstractmethod
def send_action(self, action: RobotAction) -> RobotAction:
def _get_observation(self) -> RobotObservation:
"""
Send an action command to the robot.
Args:
action (RobotAction): Dictionary representing the desired action. Its structure should match
:pymeth:`action_features`.
Retrieve the raw observation directly from robot hardware.
Returns:
RobotAction: The action actually sent to the motors potentially clipped or modified, e.g. by
safety limits on velocity.
RobotObservation: A flat dictionary representing the robot's current sensory
state. Its structure should match :pymeth:`raw_observation_features`.
"""
pass
def send_action(self, action: RobotAction) -> RobotAction:
"""
Apply the input pipeline and send the resulting action to robot hardware.
The input pipeline receives ``(action, last_raw_obs)`` so IK solvers can use the
cached joint configuration as an initial guess. With the default identity pipeline,
the action is forwarded unchanged.
Args:
action (RobotAction): Dictionary representing the desired action. Its structure
should match :pymeth:`action_features`.
Returns:
RobotAction: The action actually sent to the motors, potentially clipped or
modified by the pipeline or hardware safety limits.
"""
transformed = self.input_pipeline()((action, self._last_raw_obs))
return self._send_action(transformed)
@abc.abstractmethod
def _send_action(self, action: RobotAction) -> RobotAction:
"""
Send an action command directly to robot hardware.
Args:
action (RobotAction): Dictionary of motor-level commands. Its structure should
match what the hardware expects (typically motor positions/velocities).
Returns:
RobotAction: The action actually sent, potentially clipped by safety limits.
"""
pass

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .ee_space import make_so10x_fk_observation_pipeline, make_so10x_ik_action_pipeline
__all__ = ["make_so10x_fk_observation_pipeline", "make_so10x_ik_action_pipeline"]

View File

@@ -0,0 +1,147 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
End-effector space pipelines for SO-100/101 follower robots.
These factory functions return ready-to-use pipelines that convert between joint space
and Cartesian end-effector space. Attach them to a robot with ``set_output_pipeline`` /
``set_input_pipeline`` to enable EE-space recording and teleoperation.
Example::
from lerobot.robots.so_follower.pipelines import (
make_so10x_fk_observation_pipeline,
make_so10x_ik_action_pipeline,
)
motor_names = list(follower.bus.motors.keys())
follower.set_output_pipeline(make_so10x_fk_observation_pipeline(URDF_PATH, motor_names))
follower.set_input_pipeline(make_so10x_ik_action_pipeline(URDF_PATH, motor_names))
"""
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotObservation, RobotProcessorPipeline
from lerobot.processor.converters import (
observation_to_transition,
robot_action_observation_to_transition,
transition_to_observation,
transition_to_robot_action,
)
from lerobot.robots.so_follower.robot_kinematic_processor import (
EEBoundsAndSafety,
ForwardKinematicsJointsToEE,
InverseKinematicsEEToJoints,
)
_DEFAULT_EE_BOUNDS = {"min": [-1.0, -1.0, -1.0], "max": [1.0, 1.0, 1.0]}
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
def make_so10x_fk_observation_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
) -> RobotProcessorPipeline[RobotObservation, RobotObservation]:
"""
Create a forward-kinematics observation pipeline for SO-100/101 follower robots.
Converts raw joint positions (observation) into end-effector pose (position + orientation).
Attach this to a follower robot via ``set_output_pipeline`` so that ``get_observation()``
returns EE coordinates instead of raw joint angles.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
Returns:
A RobotProcessorPipeline that maps joint observations to EE observations.
Example::
follower.set_output_pipeline(
make_so10x_fk_observation_pipeline("./so101.urdf", motor_names)
)
obs = follower.get_observation() # now contains ee.x, ee.y, ee.z, ...
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
return RobotProcessorPipeline[RobotObservation, RobotObservation](
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
to_transition=observation_to_transition,
to_output=transition_to_observation,
)
def make_so10x_ik_action_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
end_effector_bounds: dict | None = None,
max_ee_step_m: float = 0.10,
) -> RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction]:
"""
Create an inverse-kinematics action pipeline for SO-100/101 follower robots.
Converts incoming end-effector pose commands into joint positions, applying safety
bounds and step-size limits before solving IK. The current joint positions are used
as the IK initial guess (taken from the cached ``_last_raw_obs``).
Attach this to a follower robot via ``set_input_pipeline`` so that ``send_action()``
receives EE commands and translates them to motor positions before the hardware write.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
end_effector_bounds: Dict with ``"min"`` and ``"max"`` lists (3D position bounds in metres).
Defaults to ``{"min": [-1, -1, -1], "max": [1, 1, 1]}``.
max_ee_step_m: Maximum allowed EE position change per step in metres.
Returns:
A RobotProcessorPipeline that maps (EE action, raw obs) to joint action.
Example::
follower.set_input_pipeline(
make_so10x_ik_action_pipeline("./so101.urdf", motor_names)
)
# send_action() now accepts ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
bounds = end_effector_bounds or _DEFAULT_EE_BOUNDS
return RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction](
steps=[
EEBoundsAndSafety(end_effector_bounds=bounds, max_ee_step_m=max_ee_step_m),
InverseKinematicsEEToJoints(
kinematics=kinematics,
motor_names=motor_names,
initial_guess_current_joints=True,
),
],
to_transition=robot_action_observation_to_transition,
to_output=transition_to_robot_action,
)

View File

@@ -74,11 +74,11 @@ class SOFollower(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return self._motors_ft
@property
@@ -176,7 +176,7 @@ class SOFollower(Robot):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
# Read arm position
start = time.perf_counter()
obs_dict = self.bus.sync_read("Present_Position")
@@ -194,7 +194,7 @@ class SOFollower(Robot):
return obs_dict
@check_if_not_connected
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
"""Command arm to move to a target joint configuration.
The relative action magnitude may be clipped depending on the configuration parameter

View File

@@ -170,7 +170,7 @@ class UnitreeG1(Robot):
time.sleep(sleep_time)
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{G1_29_JointIndex(motor).name}.q": float for motor in G1_29_JointIndex}
def calibrate(self) -> None: # robot is already calibrated
@@ -273,7 +273,7 @@ class UnitreeG1(Robot):
for cam in self._cameras.values():
cam.disconnect()
def get_observation(self) -> RobotObservation:
def _get_observation(self) -> RobotObservation:
lowstate = self._lowstate
if lowstate is None:
return {}
@@ -351,10 +351,10 @@ class UnitreeG1(Robot):
}
@cached_property
def observation_features(self) -> dict[str, type | tuple]:
def raw_observation_features(self) -> dict[str, type | tuple]:
return {**self._motors_ft, **self._cameras_ft}
def send_action(self, action: RobotAction) -> RobotAction:
def _send_action(self, action: RobotAction) -> RobotAction:
for motor in G1_29_JointIndex:
key = f"{motor.name}.q"
if key in action:
@@ -421,7 +421,7 @@ class UnitreeG1(Robot):
num_steps = int(total_time / control_dt)
# get current state
obs = self.get_observation()
obs = self._get_observation()
# record current positions
init_dof_pos = np.zeros(29, dtype=np.float32)
@@ -439,7 +439,7 @@ class UnitreeG1(Robot):
interp_pos = init_dof_pos[motor.value] * (1 - alpha) + target_pos * alpha
action_dict[f"{motor.name}.q"] = float(interp_pos)
self.send_action(action_dict)
self._send_action(action_dict)
# Maintain constant control rate
elapsed = time.time() - start_time

View File

@@ -56,6 +56,7 @@ from lerobot.teleoperators import ( # noqa: F401
make_teleoperator_from_config,
omx_leader,
openarm_leader,
openarm_mini,
so_leader,
unitree_g1,
)

View File

@@ -61,6 +61,7 @@ from lerobot.teleoperators import ( # noqa: F401
make_teleoperator_from_config,
omx_leader,
openarm_leader,
openarm_mini,
so_leader,
)
from lerobot.utils.robot_utils import precise_sleep

View File

@@ -26,8 +26,10 @@ lerobot-record \
--dataset.repo_id=<my_username>/<my_dataset_name> \
--dataset.num_episodes=2 \
--dataset.single_task="Grab the cube" \
--dataset.streaming_encoding=true \
--dataset.encoder_threads=2 \
--display_data=true
# <- Optional: specify video codec (h264, hevc, libsvtav1). Default is libsvtav1. \
# <- Optional: specify video codec (auto, h264, hevc, libsvtav1). Default is libsvtav1. \
# --dataset.vcodec=h264 \
# <- Teleop optional if you want to teleoperate to record or in between episodes with a policy \
# --teleop.type=so100_leader \
@@ -58,7 +60,10 @@ lerobot-record \
--display_data=true \
--dataset.repo_id=${HF_USER}/bimanual-so-handover-cube \
--dataset.num_episodes=25 \
--dataset.single_task="Grab and handover the red cube to the other arm"
--dataset.single_task="Grab and handover the red cube to the other arm" \
--dataset.streaming_encoding=true \
# --dataset.vcodec=auto \
--dataset.encoder_threads=2
```
"""
@@ -69,6 +74,8 @@ from pathlib import Path
from pprint import pformat
from typing import Any
import torch
from lerobot.cameras import ( # noqa: F401
CameraConfig, # noqa: F401
)
@@ -80,19 +87,16 @@ from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features, create_initial_features
from lerobot.datasets.utils import build_dataset_frame, combine_feature_dicts
from lerobot.datasets.utils import build_dataset_frame
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy, make_pre_post_processors
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.policies.rtc import ActionInterpolator
from lerobot.policies.utils import make_robot_action
from lerobot.processor import (
PolicyAction,
PolicyProcessorPipeline,
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_processors,
)
from lerobot.processor.rename_processor import rename_stats
from lerobot.robots import ( # noqa: F401
@@ -120,6 +124,7 @@ from lerobot.teleoperators import ( # noqa: F401
make_teleoperator_from_config,
omx_leader,
openarm_leader,
openarm_mini,
reachy2_teleoperator,
so_leader,
unitree_g1,
@@ -134,6 +139,11 @@ from lerobot.utils.control_utils import (
sanity_check_dataset_robot_compatibility,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.pipeline_utils import (
build_dataset_features,
check_action_space_compatibility,
check_observation_space_compatibility,
)
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import (
get_safe_torch_device,
@@ -179,9 +189,19 @@ class DatasetRecordConfig:
# Number of episodes to record before batch encoding videos
# Set to 1 for immediate encoding (default behavior), or higher for batched encoding
video_encoding_batch_size: int = 1
# Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1'.
# Use 'h264' for faster encoding on systems where AV1 encoding is CPU-heavy.
# Video codec for encoding videos. Options: 'h264', 'hevc', 'libsvtav1', 'auto',
# or hardware-specific: 'h264_videotoolbox', 'h264_nvenc', 'h264_vaapi', 'h264_qsv'.
# Use 'auto' to auto-detect the best available hardware encoder.
vcodec: str = "libsvtav1"
# Enable streaming video encoding: encode frames in real-time during capture instead
# of writing PNG images first. Makes save_episode() near-instant. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding
streaming_encoding: bool = False
# Maximum number of frames to buffer per camera when using streaming encoding.
# ~1s buffer at 30fps. Provides backpressure if the encoder can't keep up.
encoder_queue_maxsize: int = 30
# Number of threads per encoder instance. None = auto (codec default).
# Lower values reduce CPU usage, maps to 'lp' (via svtav1-params) for libsvtav1 and 'threads' for h264/hevc..
encoder_threads: int | None = None
# Rename map for the observation to override the image and state keys
rename_map: dict[str, str] = field(default_factory=dict)
@@ -210,6 +230,9 @@ class RecordConfig:
play_sounds: bool = True
# Resume recording on an existing dataset.
resume: bool = False
# Action interpolation multiplier for smoother policy control (1=off, 2=2x, 3=3x)
# Only applies when using a policy (not teleop)
interpolation_multiplier: int = 1
def __post_init__(self):
# HACK: We parse again the cli args here to get the pretrained path if there was one.
@@ -233,28 +256,23 @@ class RecordConfig:
""" --------------- record_loop() data flow --------------------------
[ Robot ]
V
[ robot.get_observation() ] ---> raw_obs
V
[ robot_observation_processor ] ---> processed_obs
[ robot.get_observation() ] → applies output_pipeline internally → obs
V
.-----( ACTION LOGIC )------------------.
V V
[ From Teleoperator ] [ From Policy ]
| |
| [teleop.get_action] -> raw_action | [predict_action]
| | | |
| V | V
| [teleop_action_processor] | |
| | | |
'---> processed_teleop_action '---> processed_policy_action
| teleop.get_action() | predict_action(obs)
| (output_pipeline applied internally) | |
| | | V
'----> action '---> policy_action_dict
| |
'-------------------------.-------------'
V
[ robot_action_processor ] --> robot_action_to_send
[ robot.send_action(action) ]
(input_pipeline applied internally)
V
[ robot.send_action() ] -- (Robot Executes)
V
( Save to Dataset )
( Save action + obs to Dataset )
V
( Rerun Log / Loop Wait )
"""
@@ -265,15 +283,6 @@ def record_loop(
robot: Robot,
events: dict,
fps: int,
teleop_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs after teleop
robot_action_processor: RobotProcessorPipeline[
tuple[RobotAction, RobotObservation], RobotAction
], # runs before robot
robot_observation_processor: RobotProcessorPipeline[
RobotObservation, RobotObservation
], # runs after robot
dataset: LeRobotDataset | None = None,
teleop: Teleoperator | list[Teleoperator] | None = None,
policy: PreTrainedPolicy | None = None,
@@ -282,8 +291,30 @@ def record_loop(
control_time_s: int | None = None,
single_task: str | None = None,
display_data: bool = False,
interpolator: ActionInterpolator | None = None,
display_compressed_images: bool = False,
):
"""
Core recording loop. Robot and teleoperator pipelines are applied internally —
no explicit processor arguments are needed.
Args:
robot: The robot instance. Its output_pipeline() transforms observations and
its input_pipeline() transforms actions before hardware write.
events: Control events dict (exit_early, stop_recording, rerecord_episode).
fps: Target control loop frequency.
dataset: If provided, frames are written here each step.
teleop: Teleoperator or list of teleoperators. Its output_pipeline() transforms
actions (e.g., joint → EE) before they are sent to the robot.
policy: Optional pre-trained policy for closed-loop control.
preprocessor: Policy input pre-processor.
postprocessor: Policy output post-processor.
control_time_s: Episode duration in seconds.
single_task: Task description string saved with each frame.
display_data: If True, log observations and actions to Rerun.
interpolator: Optional action interpolator for smoother policy control.
display_compressed_images: If True, compress images before Rerun display.
"""
if dataset is not None and dataset.fps != fps:
raise ValueError(f"The dataset fps should be equal to requested fps ({dataset.fps} != {fps}).")
@@ -318,6 +349,17 @@ def record_loop(
preprocessor.reset()
postprocessor.reset()
# Reset interpolator if provided
if interpolator is not None:
interpolator.reset()
# Calculate control interval based on interpolation
use_interpolation = interpolator is not None and interpolator.enabled and policy is not None
control_interval = interpolator.get_control_interval(fps) if interpolator else 1 / fps
# Pre-compute once — action features don't change during a recording episode
action_keys = sorted(robot.action_features) if use_interpolation else []
no_action_count = 0
timestamp = 0
start_episode_t = time.perf_counter()
while timestamp < control_time_s:
@@ -327,65 +369,85 @@ def record_loop(
events["exit_early"] = False
break
# Get robot observation
# Get robot observation (output_pipeline applied internally)
obs = robot.get_observation()
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_processed = robot_observation_processor(obs)
if policy is not None or dataset is not None:
observation_frame = build_dataset_frame(dataset.features, obs_processed, prefix=OBS_STR)
observation_frame = build_dataset_frame(dataset.features, obs, prefix=OBS_STR)
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
# With interpolation: only call policy when interpolator needs new action
if use_interpolation:
if interpolator.needs_new_action():
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# send_action applies input_pipeline (e.g. IK) internally;
# capture the actually-sent joint action for interpolation
sent_joint_action = robot.send_action(act_processed_policy)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# Build interpolation tensor from the motor-level joint action
action_tensor = torch.tensor([sent_joint_action[k] for k in action_keys])
interpolator.add(action_tensor)
# Get interpolated action (in joint/motor space)
interp_action = interpolator.get()
if interp_action is not None:
action_values = {k: interp_action[i].item() for i, k in enumerate(action_keys)}
# Interpolated values are already in joint space; bypass IK pipeline
robot._send_action(action_values)
else:
# No action available yet, skip this iteration
continue
else:
action_values = predict_action(
observation=observation_frame,
policy=policy,
device=get_safe_torch_device(policy.config.device),
preprocessor=preprocessor,
postprocessor=postprocessor,
use_amp=policy.config.use_amp,
task=single_task,
robot_type=robot.robot_type,
)
act_processed_policy: RobotAction = make_robot_action(action_values, dataset.features)
# send_action applies input_pipeline (e.g. IK) internally
robot.send_action(act_processed_policy)
action_values = act_processed_policy
elif policy is None and isinstance(teleop, Teleoperator):
act = teleop.get_action()
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
act_processed_teleop = teleop_action_processor((act, obs))
# get_action applies output_pipeline (e.g. FK) internally
action_values = teleop.get_action()
# send_action applies input_pipeline (e.g. IK) internally
robot.send_action(action_values)
elif policy is None and isinstance(teleop, list):
arm_action = teleop_arm.get_action()
# LeKiwi multi-teleop path
arm_action = teleop_arm.get_action() # output_pipeline applied internally
arm_action = {f"arm_{k}": v for k, v in arm_action.items()}
keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
act_processed_teleop = teleop_action_processor((act, obs))
action_values = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
robot.send_action(action_values) # input_pipeline applied internally
else:
logging.info(
"No policy or teleoperator provided, skipping action generation."
"This is likely to happen when resetting the environment without a teleop device."
"The robot won't be at its rest position at the start of the next episode."
)
no_action_count += 1
if no_action_count == 1 or no_action_count % 10 == 0:
logging.warning(
"No policy or teleoperator provided, skipping action generation. "
"This is likely to happen when resetting the environment without a teleop device. "
"The robot won't be at its rest position at the start of the next episode."
)
continue
# Applies a pipeline to the action, default is IdentityProcessor
if policy is not None and act_processed_policy is not None:
action_values = act_processed_policy
robot_action_to_send = robot_action_processor((act_processed_policy, obs))
else:
action_values = act_processed_teleop
robot_action_to_send = robot_action_processor((act_processed_teleop, obs))
# Send action to robot
# Action can eventually be clipped using `max_relative_target`,
# so action actually sent is saved in the dataset. action = postprocessor.process(action)
# TODO(steven, pepijn, adil): we should use a pipeline step to clip the action, so the sent action is the action that we input to the robot.
_sent_action = robot.send_action(robot_action_to_send)
# Write to dataset
if dataset is not None:
action_frame = build_dataset_frame(dataset.features, action_values, prefix=ACTION)
@@ -394,12 +456,12 @@ def record_loop(
if display_data:
log_rerun_data(
observation=obs_processed, action=action_values, compress_images=display_compressed_images
observation=obs, action=action_values, compress_images=display_compressed_images
)
dt_s = time.perf_counter() - start_loop_t
sleep_time_s: float = 1 / fps - dt_s
sleep_time_s: float = control_interval - dt_s
if sleep_time_s < 0:
logging.warning(
f"Record loop is running slower ({1 / dt_s:.1f} Hz) than the target FPS ({fps} Hz). Dataset frames might be dropped and robot control might be unstable. Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long 3) CPU starvation"
@@ -425,22 +487,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot = make_robot_from_config(cfg.robot)
teleop = make_teleoperator_from_config(cfg.teleop) if cfg.teleop is not None else None
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
dataset_features = combine_feature_dicts(
aggregate_pipeline_dataset_features(
pipeline=teleop_action_processor,
initial_features=create_initial_features(
action=robot.action_features
), # TODO(steven, pepijn): in future this should be come from teleop or policy
use_videos=cfg.dataset.video,
),
aggregate_pipeline_dataset_features(
pipeline=robot_observation_processor,
initial_features=create_initial_features(observation=robot.observation_features),
use_videos=cfg.dataset.video,
),
)
# Dataset features derived automatically from robot/teleop pipelines.
# When teleop is None (policy-only recording), only observation features are included.
dataset_features = build_dataset_features(robot, teleop, use_videos=cfg.dataset.video)
dataset = None
listener = None
@@ -452,6 +501,9 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
root=cfg.dataset.root,
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
)
if hasattr(robot, "cameras") and len(robot.cameras) > 0:
@@ -474,12 +526,16 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
image_writer_threads=cfg.dataset.num_image_writer_threads_per_camera * len(robot.cameras),
batch_encoding_size=cfg.dataset.video_encoding_batch_size,
vcodec=cfg.dataset.vcodec,
streaming_encoding=cfg.dataset.streaming_encoding,
encoder_queue_maxsize=cfg.dataset.encoder_queue_maxsize,
encoder_threads=cfg.dataset.encoder_threads,
)
# Load pretrained policy
policy = None if cfg.policy is None else make_policy(cfg.policy, ds_meta=dataset.meta)
preprocessor = None
postprocessor = None
interpolator = None
if cfg.policy is not None:
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
@@ -490,13 +546,26 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map},
},
)
# Create interpolator for smoother policy control
if cfg.interpolation_multiplier > 1:
interpolator = ActionInterpolator(multiplier=cfg.interpolation_multiplier)
logging.info(f"Action interpolation enabled: {cfg.interpolation_multiplier}x control rate")
robot.connect()
if teleop is not None:
teleop.connect()
if teleop is not None:
check_action_space_compatibility(teleop, robot)
check_observation_space_compatibility(robot, teleop)
listener, events = init_keyboard_listener()
if not cfg.dataset.streaming_encoding:
logging.info(
"Streaming encoding is disabled. If you have capable hardware, consider enabling it for way faster episode saving. --dataset.streaming_encoding=true --dataset.encoder_threads=2 # --dataset.vcodec=auto. More info in the documentation: https://huggingface.co/docs/lerobot/streaming_video_encoding"
)
with VideoEncodingManager(dataset):
recorded_episodes = 0
while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]:
@@ -505,9 +574,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
policy=policy,
preprocessor=preprocessor,
@@ -516,6 +582,7 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
control_time_s=cfg.dataset.episode_time_s,
single_task=cfg.dataset.single_task,
display_data=cfg.display_data,
interpolator=interpolator,
display_compressed_images=display_compressed_images,
)
@@ -534,9 +601,6 @@ def record(cfg: RecordConfig) -> LeRobotDataset:
robot=robot,
events=events,
fps=cfg.dataset.fps,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
teleop=teleop,
control_time_s=cfg.dataset.reset_time_s,
single_task=cfg.dataset.single_task,

View File

@@ -47,9 +47,6 @@ from pprint import pformat
from lerobot.configs import parser
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.processor import (
make_default_robot_action_processor,
)
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -80,7 +77,7 @@ class DatasetReplayConfig:
repo_id: str
# Episode to replay.
episode: int
# Root directory where the dataset will be stored (e.g. 'dataset/path').
# Root directory where the dataset will be stored (e.g. 'dataset/path'). If None, defaults to $HF_LEROBOT_HOME/repo_id.
root: str | Path | None = None
# Limit the frames per second. By default, uses the policy fps.
fps: int = 30
@@ -99,8 +96,6 @@ def replay(cfg: ReplayConfig):
init_logging()
logging.info(pformat(asdict(cfg)))
robot_action_processor = make_default_robot_action_processor()
robot = make_robot_from_config(cfg.robot)
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
@@ -120,11 +115,10 @@ def replay(cfg: ReplayConfig):
for i, name in enumerate(dataset.features[ACTION]["names"]):
action[name] = action_array[i]
robot_obs = robot.get_observation()
# Update cached observation so the robot's input pipeline can use it (e.g. for IK)
robot.get_observation()
processed_action = robot_action_processor((action, robot_obs))
_ = robot.send_action(processed_action)
_ = robot.send_action(action)
dt_s = time.perf_counter() - start_episode_t
precise_sleep(max(1 / dataset.fps - dt_s, 0.0))

View File

@@ -152,6 +152,7 @@ def test_motor(bus, motor_id: int, timeout: float, use_fd: bool):
)
try:
bus.send(disable_msg)
bus.recv(timeout=0.1) # Clear any pending responses
except Exception:
print(f"Error sending message to motor 0x{motor_id:02X}")

View File

@@ -43,6 +43,7 @@ from lerobot.teleoperators import ( # noqa: F401
koch_leader,
make_teleoperator_from_config,
omx_leader,
openarm_mini,
so_leader,
)
@@ -51,6 +52,7 @@ COMPATIBLE_DEVICES = [
"koch_leader",
"omx_follower",
"omx_leader",
"openarm_mini",
"so100_follower",
"so100_leader",
"so101_follower",

View File

@@ -61,12 +61,6 @@ import rerun as rr
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
from lerobot.configs import parser
from lerobot.processor import (
RobotAction,
RobotObservation,
RobotProcessorPipeline,
make_default_processors,
)
from lerobot.robots import ( # noqa: F401
Robot,
RobotConfig,
@@ -94,11 +88,13 @@ from lerobot.teleoperators import ( # noqa: F401
make_teleoperator_from_config,
omx_leader,
openarm_leader,
openarm_mini,
reachy2_teleoperator,
so_leader,
unitree_g1,
)
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.pipeline_utils import check_action_space_compatibility, check_observation_space_compatibility
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import init_logging, move_cursor_up
from lerobot.utils.visualization_utils import init_rerun, log_rerun_data
@@ -126,28 +122,28 @@ def teleop_loop(
teleop: Teleoperator,
robot: Robot,
fps: int,
teleop_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
robot_action_processor: RobotProcessorPipeline[tuple[RobotAction, RobotObservation], RobotAction],
robot_observation_processor: RobotProcessorPipeline[RobotObservation, RobotObservation],
display_data: bool = False,
duration: float | None = None,
display_compressed_images: bool = False,
):
"""
This function continuously reads actions from a teleoperation device, processes them through optional
pipelines, sends them to a robot, and optionally displays the robot's state. The loop runs at a
specified frequency until a set duration is reached or it is manually interrupted.
Continuously reads actions from a teleoperation device, sends them to a robot,
and optionally displays the robot's state. Pipelines are applied internally by
the robot and teleoperator objects.
The loop runs at the specified frequency until a set duration is reached or it
is manually interrupted.
Args:
teleop: The teleoperator device instance providing control actions.
robot: The robot instance being controlled.
fps: The target frequency for the control loop in frames per second.
display_data: If True, fetches robot observations and displays them in the console and Rerun.
display_compressed_images: If True, compresses images before sending them to Rerun for display.
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
robot_observation_processor: An optional pipeline to process raw observations from the robot.
display_data: If True, fetches robot observations and displays them in the
console and Rerun.
display_compressed_images: If True, compresses images before sending them
to Rerun for display.
duration: The maximum duration of the teleoperation loop in seconds.
If None, the loop runs indefinitely.
"""
display_len = max(len(key) for key in robot.action_features)
@@ -156,40 +152,29 @@ def teleop_loop(
while True:
loop_start = time.perf_counter()
# Get robot observation
# Not really needed for now other than for visualization
# teleop_action_processor can take None as an observation
# given that it is the identity processor as default
obs = robot.get_observation()
# Get teleop action (output_pipeline applied internally)
action = teleop.get_action()
# Get teleop action
raw_action = teleop.get_action()
# Process teleop action through pipeline
teleop_action = teleop_action_processor((raw_action, obs))
# Process action for robot through pipeline
robot_action_to_send = robot_action_processor((teleop_action, obs))
# Send processed action to robot (robot_action_processor.to_output should return RobotAction)
_ = robot.send_action(robot_action_to_send)
# Send action to robot (input_pipeline applied internally)
robot_action_sent = robot.send_action(action)
if display_data:
# Process robot observation through pipeline
obs_transition = robot_observation_processor(obs)
# Get robot observation (output_pipeline applied internally)
obs = robot.get_observation()
teleop.send_feedback(obs)
log_rerun_data(
observation=obs_transition,
action=teleop_action,
observation=obs,
action=action,
compress_images=display_compressed_images,
)
print("\n" + "-" * (display_len + 10))
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
# Display the final robot action that was sent
for motor, value in robot_action_to_send.items():
print(f"{motor:<{display_len}} | {value:>7.2f}")
move_cursor_up(len(robot_action_to_send) + 3)
for motor, value in robot_action_sent.items():
if isinstance(value, float | int):
print(f"{motor:<{display_len}} | {value:>7.2f}")
move_cursor_up(len(robot_action_sent) + 3)
dt_s = time.perf_counter() - loop_start
precise_sleep(max(1 / fps - dt_s, 0.0))
@@ -215,11 +200,13 @@ def teleoperate(cfg: TeleoperateConfig):
teleop = make_teleoperator_from_config(cfg.teleop)
robot = make_robot_from_config(cfg.robot)
teleop_action_processor, robot_action_processor, robot_observation_processor = make_default_processors()
teleop.connect()
robot.connect()
check_action_space_compatibility(teleop, robot)
check_observation_space_compatibility(robot, teleop)
try:
teleop_loop(
teleop=teleop,
@@ -227,9 +214,6 @@ def teleoperate(cfg: TeleoperateConfig):
fps=cfg.fps,
display_data=cfg.display_data,
duration=cfg.teleop_time_s,
teleop_action_processor=teleop_action_processor,
robot_action_processor=robot_action_processor,
robot_observation_processor=robot_observation_processor,
display_compressed_images=display_compressed_images,
)
except KeyboardInterrupt:

View File

@@ -24,6 +24,7 @@ import torch
from accelerate import Accelerator
from termcolor import colored
from torch.optim import Optimizer
from tqdm import tqdm
from lerobot.configs import parser
from lerobot.configs.train import TrainPipelineConfig
@@ -51,6 +52,7 @@ from lerobot.utils.utils import (
format_big_number,
has_method,
init_logging,
inside_slurm,
)
@@ -378,10 +380,10 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
"dataloading_s": AverageMeter("data_s", ":.3f"),
}
# Use effective batch size for proper epoch calculation in distributed training
# Keep global batch size for logging; MetricsTracker handles world size internally.
effective_batch_size = cfg.batch_size * accelerator.num_processes
train_tracker = MetricsTracker(
effective_batch_size,
cfg.batch_size,
dataset.num_frames,
dataset.num_episodes,
train_metrics,
@@ -390,6 +392,14 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
)
if is_main_process:
progbar = tqdm(
total=cfg.steps - step,
desc="Training",
unit="step",
disable=inside_slurm(),
position=0,
leave=True,
)
logging.info(
f"Start offline training on a fixed dataset, with effective batch size: {effective_batch_size}"
)
@@ -414,6 +424,8 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
# Note: eval and checkpoint happens *after* the `step`th training update has completed, so we
# increment `step` here.
step += 1
if is_main_process:
progbar.update(1)
train_tracker.step()
is_log_step = cfg.log_freq > 0 and step % cfg.log_freq == 0 and is_main_process
is_saving_step = step % cfg.save_freq == 0 or step == cfg.steps
@@ -507,6 +519,9 @@ def train(cfg: TrainPipelineConfig, accelerator: Accelerator | None = None):
accelerator.wait_for_everyone()
if is_main_process:
progbar.close()
if eval_env:
close_envs(eval_env)

View File

@@ -72,9 +72,9 @@ class BiOpenArmLeader(Teleoperator):
self.right_arm = OpenArmLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
def raw_action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.raw_action_features
right_arm_features = self.right_arm.raw_action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
@@ -82,7 +82,7 @@ class BiOpenArmLeader(Teleoperator):
}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -112,7 +112,7 @@ class BiOpenArmLeader(Teleoperator):
)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
action_dict = {}
# Add "left_" prefix
@@ -125,7 +125,7 @@ class BiOpenArmLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError

View File

@@ -55,9 +55,9 @@ class BiSOLeader(Teleoperator):
self.right_arm = SOLeader(right_arm_config)
@cached_property
def action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.action_features
right_arm_features = self.right_arm.action_features
def raw_action_features(self) -> dict[str, type]:
left_arm_features = self.left_arm.raw_action_features
right_arm_features = self.right_arm.raw_action_features
return {
**{f"left_{k}": v for k, v in left_arm_features.items()},
@@ -65,7 +65,7 @@ class BiSOLeader(Teleoperator):
}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -94,7 +94,7 @@ class BiSOLeader(Teleoperator):
self.right_arm.setup_motors()
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
action_dict = {}
# Add "left_" prefix
@@ -107,7 +107,7 @@ class BiSOLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError

View File

@@ -57,7 +57,7 @@ class GamepadTeleop(Teleoperator):
self.gamepad = None
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
@@ -72,7 +72,7 @@ class GamepadTeleop(Teleoperator):
}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
def connect(self) -> None:
@@ -87,7 +87,7 @@ class GamepadTeleop(Teleoperator):
self.gamepad.start()
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
# Update the controller to get fresh inputs
self.gamepad.update()
@@ -180,7 +180,7 @@ class GamepadTeleop(Teleoperator):
# No additional configuration needed
pass
def send_feedback(self, feedback: dict) -> None:
def _send_feedback(self, feedback: dict) -> None:
"""Send feedback to the gamepad."""
# Gamepad doesn't support feedback
pass

View File

@@ -81,11 +81,11 @@ class HomunculusArm(Teleoperator):
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -298,11 +298,11 @@ class HomunculusArm(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
joint_positions = self._read()
return {f"{joint}.pos": pos for joint, pos in joint_positions.items()}
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
@check_if_not_connected

View File

@@ -107,11 +107,11 @@ class HomunculusGlove(Teleoperator):
self.state_lock = threading.Lock()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {f"{joint}.pos": float for joint in self.joints}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -324,13 +324,13 @@ class HomunculusGlove(Teleoperator):
logger.debug(f"Error reading frame in background thread for {self}: {e}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
joint_positions = self._read()
return homunculus_glove_to_hope_jr_hand(
{f"{joint}.pos": pos for joint, pos in joint_positions.items()}
)
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
@check_if_not_connected

View File

@@ -67,7 +67,7 @@ class KeyboardTeleop(Teleoperator):
self.logs = {}
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
return {
"dtype": "float32",
"shape": (len(self.arm),),
@@ -75,7 +75,7 @@ class KeyboardTeleop(Teleoperator):
}
@property
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
return {}
@property
@@ -122,7 +122,7 @@ class KeyboardTeleop(Teleoperator):
pass
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
before_read_t = time.perf_counter()
self._drain_pressed_keys()
@@ -133,7 +133,7 @@ class KeyboardTeleop(Teleoperator):
return dict.fromkeys(action, None)
def send_feedback(self, feedback: dict[str, Any]) -> None:
def _send_feedback(self, feedback: dict[str, Any]) -> None:
pass
@check_if_not_connected
@@ -157,7 +157,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
self.misc_keys_queue = Queue()
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
if self.config.use_gripper:
return {
"dtype": "float32",
@@ -172,7 +172,7 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
}
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
self._drain_pressed_keys()
delta_x = 0.0
delta_y = 0.0
@@ -338,7 +338,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
self.current_angular_speed = config.angular_speed
@property
def action_features(self) -> dict:
def raw_action_features(self) -> dict:
"""Return action format for rover (linear and angular velocities)."""
return {
"linear.vel": float,
@@ -361,7 +361,7 @@ class KeyboardRoverTeleop(KeyboardTeleop):
self.current_pressed.pop(key_char, None)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
"""
Get the current action based on pressed keys.

View File

@@ -58,11 +58,11 @@ class KochLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -160,7 +160,7 @@ class KochLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -168,7 +168,7 @@ class KochLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO(rcadene, aliberts): Implement force feedback
raise NotImplementedError

View File

@@ -57,11 +57,11 @@ class OmxLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -149,7 +149,7 @@ class OmxLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -157,7 +157,7 @@ class OmxLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO(rcadene, aliberts): Implement force feedback
raise NotImplementedError

View File

@@ -65,7 +65,7 @@ class OpenArmLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
"""Features produced by this teleoperator."""
features: dict[str, type] = {}
for motor in self.bus.motors:
@@ -75,7 +75,7 @@ class OpenArmLeader(Teleoperator):
return features
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
"""Feedback features (not implemented for OpenArms)."""
return {}
@@ -183,7 +183,7 @@ class OpenArmLeader(Teleoperator):
)
@check_if_not_connected
def get_action(self) -> RobotAction:
def _get_action(self) -> RobotAction:
"""
Get current action from the leader arm.
@@ -209,7 +209,7 @@ class OpenArmLeader(Teleoperator):
return action_dict
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Feedback is not yet implemented for OpenArm leader.")
@check_if_not_connected

View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .config_openarm_mini import OpenArmMiniConfig
from .openarm_mini import OpenArmMini
__all__ = ["OpenArmMini", "OpenArmMiniConfig"]

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass
from ..config import TeleoperatorConfig
@TeleoperatorConfig.register_subclass("openarm_mini")
@dataclass
class OpenArmMiniConfig(TeleoperatorConfig):
"""Configuration for OpenArm Mini teleoperator with Feetech motors (dual arms)."""
port_right: str = "/dev/ttyUSB0"
port_left: str = "/dev/ttyUSB1"
use_degrees: bool = True

View File

@@ -0,0 +1,372 @@
#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from typing import Any
from lerobot.motors import Motor, MotorCalibration, MotorNormMode
from lerobot.motors.feetech import (
FeetechMotorsBus,
OperatingMode,
)
from lerobot.processor import RobotAction
from lerobot.utils.decorators import check_if_already_connected, check_if_not_connected
from ..teleoperator import Teleoperator
from .config_openarm_mini import OpenArmMiniConfig
logger = logging.getLogger(__name__)
# Motors whose direction is inverted on the leader side.
LEFT_MOTORS_TO_FLIP = {"joint_1", "joint_2", "joint_3", "joint_4", "joint_5", "joint_7"}
RIGHT_MOTORS_TO_FLIP = {"joint_1", "joint_3", "joint_4", "joint_5", "joint_6", "joint_7"}
# Leader(OpenArmMini) -> Follower(OpenArms) joint remap
JOINT_REMAP_TO_OPENARMS = {"joint_6": "joint_7", "joint_7": "joint_6"}
# Follower(OpenArms) -> Leader(OpenArmMini) joint remap
JOINT_REMAP_TO_MINI = {"joint_7": "joint_6", "joint_6": "joint_7"}
OPENARMS_GRIPPER_MIN = -65.0
OPENARMS_GRIPPER_MAX = 0.0
MINI_GRIPPER_MIN = 0.0
MINI_GRIPPER_MAX = 100.0
class OpenArmMini(Teleoperator):
"""
OpenArm Mini Teleoperator with dual Feetech-based arms (8 motors per arm).
Each arm has 7 joints plus a gripper, using Feetech STS3215 servos.
"""
config_class = OpenArmMiniConfig
name = "openarm_mini"
def __init__(self, config: OpenArmMiniConfig):
super().__init__(config)
self.config = config
norm_mode_body = MotorNormMode.DEGREES
motors_right = {
"joint_1": Motor(1, "sts3215", norm_mode_body),
"joint_2": Motor(2, "sts3215", norm_mode_body),
"joint_3": Motor(3, "sts3215", norm_mode_body),
"joint_4": Motor(4, "sts3215", norm_mode_body),
"joint_5": Motor(5, "sts3215", norm_mode_body),
"joint_6": Motor(6, "sts3215", norm_mode_body),
"joint_7": Motor(7, "sts3215", norm_mode_body),
"gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100),
}
motors_left = {
"joint_1": Motor(1, "sts3215", norm_mode_body),
"joint_2": Motor(2, "sts3215", norm_mode_body),
"joint_3": Motor(3, "sts3215", norm_mode_body),
"joint_4": Motor(4, "sts3215", norm_mode_body),
"joint_5": Motor(5, "sts3215", norm_mode_body),
"joint_6": Motor(6, "sts3215", norm_mode_body),
"joint_7": Motor(7, "sts3215", norm_mode_body),
"gripper": Motor(8, "sts3215", MotorNormMode.RANGE_0_100),
}
cal_right = {
k.replace("right_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("right_")
}
cal_left = {
k.replace("left_", ""): v for k, v in (self.calibration or {}).items() if k.startswith("left_")
}
self.bus_right = FeetechMotorsBus(
port=self.config.port_right,
motors=motors_right,
calibration=cal_right,
)
self.bus_left = FeetechMotorsBus(
port=self.config.port_left,
motors=motors_left,
calibration=cal_left,
)
@staticmethod
def _mini_gripper_to_openarms(value: float) -> float:
"""Convert OpenArmMini gripper range [0, 100] to OpenArms gripper range [-65, 0]."""
mapped = OPENARMS_GRIPPER_MAX + (
(value - MINI_GRIPPER_MIN)
* (OPENARMS_GRIPPER_MIN - OPENARMS_GRIPPER_MAX)
/ (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
)
return max(min(mapped, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
@staticmethod
def _openarms_gripper_to_mini(value: float) -> float:
"""Convert OpenArms gripper range [-65, 0] to OpenArmMini gripper range [0, 100]."""
clipped = max(min(value, OPENARMS_GRIPPER_MAX), OPENARMS_GRIPPER_MIN)
return MINI_GRIPPER_MIN + (
(OPENARMS_GRIPPER_MAX - clipped)
* (MINI_GRIPPER_MAX - MINI_GRIPPER_MIN)
/ (OPENARMS_GRIPPER_MAX - OPENARMS_GRIPPER_MIN)
)
@property
def raw_action_features(self) -> dict[str, type]:
features: dict[str, type] = {}
for motor in self.bus_right.motors:
features[f"right_{motor}.pos"] = float
for motor in self.bus_left.motors:
features[f"left_{motor}.pos"] = float
return features
@property
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
def is_connected(self) -> bool:
return self.bus_right.is_connected and self.bus_left.is_connected
@check_if_already_connected
def connect(self, calibrate: bool = True) -> None:
logger.info(f"Connecting right arm on {self.config.port_right}...")
self.bus_right.connect()
logger.info(f"Connecting left arm on {self.config.port_left}...")
self.bus_left.connect()
if calibrate:
self.calibrate()
self.configure()
logger.info(f"{self} connected.")
@property
def is_calibrated(self) -> bool:
return self.bus_right.is_calibrated and self.bus_left.is_calibrated
def calibrate(self) -> None:
"""
Run calibration procedure for OpenArm Mini.
1. Disable torque
2. Ask user to position arms in hanging position with grippers closed
3. Set this as zero position via half-turn homing
4. Interactive gripper calibration (open/close positions)
5. Save calibration
"""
if self.calibration:
user_input = input(
f"Press ENTER to use existing calibration for {self.id}, "
f"or type 'c' and press ENTER to run new calibration: "
)
if user_input.strip().lower() != "c":
logger.info(f"Using existing calibration for {self.id}")
cal_right = {
k.replace("right_", ""): v for k, v in self.calibration.items() if k.startswith("right_")
}
cal_left = {
k.replace("left_", ""): v for k, v in self.calibration.items() if k.startswith("left_")
}
self.bus_right.write_calibration(cal_right)
self.bus_left.write_calibration(cal_left)
return
logger.info(f"\nRunning calibration for {self}")
self._calibrate_arm("right", self.bus_right)
self._calibrate_arm("left", self.bus_left)
self._save_calibration()
print(f"\nCalibration complete and saved to {self.calibration_fpath}")
def _calibrate_arm(self, arm_name: str, bus: FeetechMotorsBus) -> None:
"""Calibrate a single arm with Feetech motors."""
logger.info(f"\n=== Calibrating {arm_name.upper()} arm ===")
bus.disable_torque()
logger.info(f"Setting Phase to 12 for all motors in {arm_name.upper()} arm...")
for motor in bus.motors:
bus.write("Phase", motor, 12)
for motor in bus.motors:
bus.write("Operating_Mode", motor, OperatingMode.POSITION.value)
input(
f"\nCalibration: Zero Position ({arm_name.upper()} arm)\n"
"Position the arm in the following configuration:\n"
" - Arm hanging straight down\n"
" - Gripper closed\n"
"Press ENTER when ready..."
)
homing_offsets = bus.set_half_turn_homings()
logger.info(f"{arm_name.capitalize()} arm zero position set.")
print(f"\nSetting motor ranges for {arm_name.upper()} arm\n")
if self.calibration is None:
self.calibration = {}
motor_resolution = bus.model_resolution_table[list(bus.motors.values())[0].model]
max_res = motor_resolution - 1
for motor_name, motor in bus.motors.items():
prefixed_name = f"{arm_name}_{motor_name}"
if motor_name == "gripper":
input(
f"\nGripper Calibration ({arm_name.upper()} arm)\n"
f"Step 1: CLOSE the gripper fully\n"
f"Press ENTER when gripper is closed..."
)
closed_pos = bus.read("Present_Position", motor_name, normalize=False)
logger.info(f" Gripper closed position recorded: {closed_pos}")
input("\nStep 2: OPEN the gripper fully\nPress ENTER when gripper is fully open...")
open_pos = bus.read("Present_Position", motor_name, normalize=False)
logger.info(f" Gripper open position recorded: {open_pos}")
if closed_pos < open_pos:
range_min = int(closed_pos)
range_max = int(open_pos)
drive_mode = 0
else:
range_min = int(open_pos)
range_max = int(closed_pos)
drive_mode = 1
logger.info(
f" {prefixed_name}: range set to [{range_min}, {range_max}] "
f"(0=closed, 100=open, drive_mode={drive_mode})"
)
else:
range_min = 0
range_max = max_res
drive_mode = 0
logger.info(f" {prefixed_name}: range set to [0, {max_res}] (full motor range)")
self.calibration[prefixed_name] = MotorCalibration(
id=motor.id,
drive_mode=drive_mode,
homing_offset=homing_offsets[motor_name],
range_min=range_min,
range_max=range_max,
)
cal_for_bus = {
k.replace(f"{arm_name}_", ""): v
for k, v in self.calibration.items()
if k.startswith(f"{arm_name}_")
}
bus.write_calibration(cal_for_bus)
def configure(self) -> None:
self.bus_right.disable_torque()
self.bus_right.configure_motors()
for motor in self.bus_right.motors:
self.bus_right.write("Operating_Mode", motor, OperatingMode.POSITION.value)
self.bus_left.disable_torque()
self.bus_left.configure_motors()
for motor in self.bus_left.motors:
self.bus_left.write("Operating_Mode", motor, OperatingMode.POSITION.value)
def setup_motors(self) -> None:
print("\nSetting up RIGHT arm motors...")
for motor in reversed(self.bus_right.motors):
input(f"Connect the controller board to the RIGHT '{motor}' motor only and press enter.")
self.bus_right.setup_motor(motor)
print(f"RIGHT '{motor}' motor id set to {self.bus_right.motors[motor].id}")
print("\nSetting up LEFT arm motors...")
for motor in reversed(self.bus_left.motors):
input(f"Connect the controller board to the LEFT '{motor}' motor only and press enter.")
self.bus_left.setup_motor(motor)
print(f"LEFT '{motor}' motor id set to {self.bus_left.motors[motor].id}")
@check_if_not_connected
def _get_action(self) -> RobotAction:
"""Get current action from both arms (read positions from all motors)."""
start = time.perf_counter()
right_positions = self.bus_right.sync_read("Present_Position")
left_positions = self.bus_left.sync_read("Present_Position")
action: dict[str, Any] = {}
for motor, val in right_positions.items():
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
mapped_val = -val if motor in RIGHT_MOTORS_TO_FLIP else val
if target_motor == "gripper":
mapped_val = self._mini_gripper_to_openarms(mapped_val)
action[f"right_{target_motor}.pos"] = mapped_val
for motor, val in left_positions.items():
target_motor = JOINT_REMAP_TO_OPENARMS.get(motor, motor)
mapped_val = -val if motor in LEFT_MOTORS_TO_FLIP else val
if target_motor == "gripper":
mapped_val = self._mini_gripper_to_openarms(mapped_val)
action[f"left_{target_motor}.pos"] = mapped_val
dt_ms = (time.perf_counter() - start) * 1e3
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
@check_if_not_connected
def enable_torque(self) -> None:
"""Enable torque on both arms for active motion commands."""
self.bus_right.enable_torque()
self.bus_left.enable_torque()
@check_if_not_connected
def disable_torque(self) -> None:
"""Disable torque on both arms for manual teleoperation."""
self.bus_right.disable_torque()
self.bus_left.disable_torque()
@check_if_not_connected
def write_goal_positions(self, action: dict[str, float]) -> None:
"""Send normalized bilateral goal positions to the underlying Feetech buses."""
right_goals: dict[str, float] = {}
left_goals: dict[str, float] = {}
for key, value in action.items():
if not key.endswith(".pos"):
continue
if key.startswith("right_"):
openarms_motor = key.removeprefix("right_").removesuffix(".pos")
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
right_goals[mini_motor] = -mapped_val if mini_motor in RIGHT_MOTORS_TO_FLIP else mapped_val
elif key.startswith("left_"):
openarms_motor = key.removeprefix("left_").removesuffix(".pos")
mini_motor = JOINT_REMAP_TO_MINI.get(openarms_motor, openarms_motor)
mapped_val = self._openarms_gripper_to_mini(value) if openarms_motor == "gripper" else value
left_goals[mini_motor] = -mapped_val if mini_motor in LEFT_MOTORS_TO_FLIP else mapped_val
if right_goals:
self.bus_right.sync_write("Goal_Position", right_goals)
if left_goals:
self.bus_left.sync_write("Goal_Position", left_goals)
@check_if_not_connected
def _send_feedback(self, feedback: dict[str, float]) -> None:
"""Route feedback position commands through the same OpenArms/OpenArmMini mapping."""
self.write_goal_positions(feedback)
@check_if_not_connected
def disconnect(self) -> None:
self.bus_right.disconnect()
self.bus_left.disconnect()
logger.info(f"{self} disconnected.")

View File

@@ -47,7 +47,7 @@ class BasePhone:
return (self._calib_pos is not None) and (self._calib_rot_inv is not None)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {
"phone.pos": np.ndarray, # shape (3,)
"phone.rot": Rotation, # scipy.spatial.transform.Rotation
@@ -56,15 +56,15 @@ class BasePhone:
}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
# No haptic or other feedback implemented yet
pass
return {}
def configure(self) -> None:
# No additional configuration required for phone teleop
pass
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# We could add haptic feedback (vibrations) here, but it's not implemented yet
raise NotImplementedError
@@ -163,7 +163,7 @@ class IOSPhone(BasePhone, Teleoperator):
return True, pos, rot, pose
@check_if_not_connected
def get_action(self) -> dict:
def _get_action(self) -> dict:
has_pose, raw_position, raw_rotation, fb_pose = self._read_current_pose()
if not has_pose or not self.is_calibrated:
return {}
@@ -314,7 +314,7 @@ class AndroidPhone(BasePhone, Teleoperator):
self._latest_message = message
@check_if_not_connected
def get_action(self) -> dict:
def _get_action(self) -> dict:
ok, raw_pos, raw_rot, pose = self._read_current_pose()
if not ok or not self.is_calibrated:
return {}
@@ -395,21 +395,21 @@ class Phone(Teleoperator):
return self._phone_impl.is_calibrated
@property
def action_features(self) -> dict[str, type]:
return self._phone_impl.action_features
def raw_action_features(self) -> dict[str, type]:
return self._phone_impl.raw_action_features
@property
def feedback_features(self) -> dict[str, type]:
return self._phone_impl.feedback_features
def raw_feedback_features(self) -> dict[str, type]:
return self._phone_impl.raw_feedback_features
def configure(self) -> None:
return self._phone_impl.configure()
def get_action(self) -> dict:
return self._phone_impl.get_action()
def _get_action(self) -> dict:
return self._phone_impl._get_action()
def send_feedback(self, feedback: dict[str, float]) -> None:
return self._phone_impl.send_feedback(feedback)
def _send_feedback(self, feedback: dict[str, float]) -> None:
return self._phone_impl._send_feedback(feedback)
def disconnect(self) -> None:
return self._phone_impl.disconnect()

View File

@@ -104,7 +104,7 @@ class Reachy2Teleoperator(Teleoperator):
return joints
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
if self.config.with_mobile_base:
return {
**dict.fromkeys(
@@ -120,7 +120,7 @@ class Reachy2Teleoperator(Teleoperator):
return dict.fromkeys(self.joints_dict.keys(), float)
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -146,7 +146,7 @@ class Reachy2Teleoperator(Teleoperator):
pass
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
joint_action: dict[str, float] = {}
@@ -168,7 +168,7 @@ class Reachy2Teleoperator(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return {**joint_action, **vel_action}
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError
def disconnect(self) -> None:

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .ee_space import make_so10x_leader_fk_pipeline
__all__ = ["make_so10x_leader_fk_pipeline"]

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Forward-kinematics pipeline for SO-100/101 leader (teleoperator) arm.
Converts raw leader joint positions into end-effector pose. Attach this to a leader
via ``set_output_pipeline`` so that ``get_action()`` returns EE coordinates instead of
raw joint angles.
Example::
from lerobot.teleoperators.so_leader.pipelines import make_so10x_leader_fk_pipeline
motor_names = list(leader.bus.motors.keys())
leader.set_output_pipeline(make_so10x_leader_fk_pipeline(URDF_PATH, motor_names))
action = leader.get_action() # now contains ee.x, ee.y, ee.z, ...
"""
from lerobot.model.kinematics import RobotKinematics
from lerobot.processor import RobotAction, RobotProcessorPipeline
from lerobot.processor.converters import (
robot_action_to_transition,
transition_to_robot_action,
)
from lerobot.robots.so_follower.robot_kinematic_processor import ForwardKinematicsJointsToEE
_DEFAULT_GRIPPER_FRAME = "gripper_frame_link"
def make_so10x_leader_fk_pipeline(
urdf_path: str,
motor_names: list[str],
*,
target_frame_name: str = _DEFAULT_GRIPPER_FRAME,
) -> RobotProcessorPipeline[RobotAction, RobotAction]:
"""
Create a forward-kinematics action pipeline for SO-100/101 leader teleoperators.
Converts raw leader joint positions (action) into end-effector pose (position +
orientation + gripper). Attach this to a leader via ``set_output_pipeline`` so that
``get_action()`` returns EE coordinates instead of raw joint angles.
Args:
urdf_path: Path to the SO-100/101 URDF file used for kinematics.
motor_names: Ordered list of motor names matching the URDF joint names.
target_frame_name: Name of the end-effector frame in the URDF.
Returns:
A RobotProcessorPipeline that maps joint actions to EE actions.
Example::
motor_names = list(leader.bus.motors.keys())
leader.set_output_pipeline(
make_so10x_leader_fk_pipeline("./so101.urdf", motor_names)
)
action = leader.get_action() # returns ee.x, ee.y, ee.z, ee.wx, ee.wy, ee.wz, ee.gripper_vel
"""
kinematics = RobotKinematics(
urdf_path=urdf_path,
target_frame_name=target_frame_name,
joint_names=motor_names,
)
return RobotProcessorPipeline[RobotAction, RobotAction](
steps=[ForwardKinematicsJointsToEE(kinematics=kinematics, motor_names=motor_names)],
to_transition=robot_action_to_transition,
to_output=transition_to_robot_action,
)

View File

@@ -55,11 +55,11 @@ class SOLeader(Teleoperator):
)
@property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{motor}.pos": float for motor in self.bus.motors}
@property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -138,7 +138,7 @@ class SOLeader(Teleoperator):
print(f"'{motor}' motor id set to {self.bus.motors[motor].id}")
@check_if_not_connected
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
start = time.perf_counter()
action = self.bus.sync_read("Present_Position")
action = {f"{motor}.pos": val for motor, val in action.items()}
@@ -146,7 +146,7 @@ class SOLeader(Teleoperator):
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
return action
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
# TODO: Implement force feedback
raise NotImplementedError

View File

@@ -12,17 +12,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import abc
import builtins
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
import draccus
from lerobot.configs.types import PipelineFeatureType
from lerobot.motors.motors_bus import MotorCalibration
from lerobot.processor import RobotAction
from lerobot.utils.constants import HF_LEROBOT_CALIBRATION, TELEOPERATORS
if TYPE_CHECKING:
from lerobot.processor.core import RobotAction
from lerobot.processor.pipeline import RobotProcessorPipeline
from .config import TeleoperatorConfig
@@ -33,6 +39,10 @@ class Teleoperator(abc.ABC):
This class provides a standardized interface for interacting with physical teleoperators.
Subclasses must implement all abstract methods and properties to be usable.
Pipelines are first-class citizens: every teleoperator carries an optional output pipeline
(applied in get_action()) and an optional input pipeline (applied in send_feedback()).
Both default to identity (no-op), so existing teleoperators work without any changes.
Attributes:
config_class (RobotConfig): The expected configuration class for this teleoperator.
name (str): The unique name used to identify this teleoperator type.
@@ -55,6 +65,14 @@ class Teleoperator(abc.ABC):
if self.calibration_fpath.is_file():
self._load_calibration()
# Pipeline interface — default to identity (no-op), swap via set_output/input_pipeline()
# Lazy import: factory is in lerobot.processor which loads after teleoperators at module init time,
# but __init__ runs at instance-creation time when lerobot.processor is fully loaded.
from lerobot.processor.factory import _make_identity_feedback_pipeline, _make_identity_teleop_action_pipeline
self._output_pipeline: RobotProcessorPipeline = _make_identity_teleop_action_pipeline()
self._input_pipeline: RobotProcessorPipeline = _make_identity_feedback_pipeline()
def __str__(self) -> str:
return f"{self.id} {self.__class__.__name__}"
@@ -84,38 +102,114 @@ class Teleoperator(abc.ABC):
except Exception: # nosec B110
pass
# ── Pipeline interface ────────────────────────────────────────────────────
def output_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside get_action() to transform raw hardware actions.
Default: identity (no-op). Override via set_output_pipeline() or subclassing.
Example: set a forward-kinematics pipeline to convert leader joint positions to EE pose.
"""
return self._output_pipeline
def input_pipeline(self) -> RobotProcessorPipeline:
"""
Pipeline applied inside send_feedback() to transform incoming feedback.
Default: identity (no-op). Override via set_input_pipeline() or subclassing.
"""
return self._input_pipeline
def set_output_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the action output pipeline (applied in get_action())."""
self._output_pipeline = pipeline
def set_input_pipeline(self, pipeline: RobotProcessorPipeline) -> None:
"""Set the feedback input pipeline (applied in send_feedback())."""
self._input_pipeline = pipeline
# ── Feature properties ────────────────────────────────────────────────────
@property
@abc.abstractmethod
def action_features(self) -> dict:
"""
A dictionary describing the structure and types of the actions produced by the teleoperator. Its
structure (keys) should match the structure of what is returned by :pymeth:`get_action`. Values for
the dict should be the type of the value if it's a simple value, e.g. `float` for single
proprioceptive value (a joint's goal position/velocity)
Pipeline-transformed action features.
Note: this property should be able to be called regardless of whether the robot is connected or not.
Applies output_pipeline().transform_features() to raw_action_features so the
returned dict matches what get_action() actually produces for callers.
Use raw_action_features to inspect hardware-level feature shapes.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(action=self.raw_action_features)
transformed = self.output_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.ACTION, {})
@property
@abc.abstractmethod
def raw_action_features(self) -> dict:
"""
Hardware-level action features (before any pipeline transformation).
A dictionary describing the structure and types of the actions produced
directly by the teleoperator hardware. Its structure (keys) should match
the structure of what is returned by :pymeth:`_get_action`. Values should be
the type of the value if it's a simple value, e.g. ``float`` for single
proprioceptive value (a joint's goal position/velocity).
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
pass
@property
@abc.abstractmethod
def feedback_features(self) -> dict:
def raw_feedback_features(self) -> dict:
"""
A dictionary describing the structure and types of the feedback actions expected by the robot. Its
structure (keys) should match the structure of what is passed to :pymeth:`send_feedback`. Values for
the dict should be the type of the value if it's a simple value, e.g. `float` for single
proprioceptive value (a joint's goal position/velocity)
Hardware-level feedback features (before any pipeline transformation).
Note: this property should be able to be called regardless of whether the robot is connected or not.
A dictionary describing the structure and types of the feedback accepted directly
by the teleoperator hardware (i.e. what :pymeth:`_send_feedback` receives). Its
structure (keys) should match the structure of what is expected by
:pymeth:`_send_feedback`. Values should be the type of the value if it's a simple
value, e.g. ``float`` for single proprioceptive value.
Return an empty dict if this teleoperator does not support feedback.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
pass
@property
def feedback_features(self) -> dict:
"""
Pipeline-transformed feedback features.
Applies input_pipeline().transform_features() to raw_feedback_features so the
returned dict reflects what the input pipeline outputs to the teleoperator hardware.
Use raw_feedback_features to inspect hardware-level feedback feature shapes.
Note: this property should be able to be called regardless of whether the
teleoperator is connected or not.
"""
from lerobot.datasets.pipeline_features import create_initial_features # lazy import
initial = create_initial_features(observation=self.raw_feedback_features)
transformed = self.input_pipeline().transform_features(initial)
return transformed.get(PipelineFeatureType.OBSERVATION, {})
@property
@abc.abstractmethod
def is_connected(self) -> bool:
"""
Whether the teleoperator is currently connected or not. If `False`, calling :pymeth:`get_action`
or :pymeth:`send_feedback` should raise an error.
Whether the teleoperator is currently connected or not. If ``False``, calling
:pymeth:`get_action` or :pymeth:`send_feedback` should raise an error.
"""
pass
@@ -133,7 +227,7 @@ class Teleoperator(abc.ABC):
@property
@abc.abstractmethod
def is_calibrated(self) -> bool:
"""Whether the teleoperator is currently calibrated or not. Should be always `True` if not applicable"""
"""Whether the teleoperator is currently calibrated or not. Should be always ``True`` if not applicable"""
pass
@abc.abstractmethod
@@ -151,7 +245,7 @@ class Teleoperator(abc.ABC):
Helper to load calibration data from the specified file.
Args:
fpath (Path | None): Optional path to the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath) as f, draccus.config_type("json"):
@@ -162,7 +256,7 @@ class Teleoperator(abc.ABC):
Helper to save calibration data to the specified file.
Args:
fpath (Path | None): Optional path to save the calibration file. Defaults to `self.calibration_fpath`.
fpath (Path | None): Optional path to save the calibration file. Defaults to ``self.calibration_fpath``.
"""
fpath = self.calibration_fpath if fpath is None else fpath
with open(fpath, "w") as f, draccus.config_type("json"):
@@ -176,29 +270,51 @@ class Teleoperator(abc.ABC):
"""
pass
@abc.abstractmethod
# ── Template methods (concrete, call pipeline internally) ─────────────────
def get_action(self) -> RobotAction:
"""
Retrieve the current action from the teleoperator.
Retrieve the current action from the teleoperator and apply the output pipeline.
Calls :pymeth:`_get_action` to get raw hardware data, then applies
:pymeth:`output_pipeline`.
Returns:
RobotAction: A flat dictionary representing the teleoperator's current actions. Its
structure should match :pymeth:`observation_features`.
RobotAction: Pipeline-transformed action. With the default identity pipeline
this equals the raw action from :pymeth:`_get_action`.
"""
raw = self._get_action()
return self.output_pipeline()(raw)
@abc.abstractmethod
def _get_action(self) -> RobotAction:
"""
Retrieve the raw action directly from teleoperator hardware.
Returns:
RobotAction: A flat dictionary representing the teleoperator's current actions.
Its structure should match :pymeth:`raw_action_features`.
"""
pass
@abc.abstractmethod
def send_feedback(self, feedback: dict[str, Any]) -> None:
"""
Send a feedback action command to the teleoperator.
Apply the input pipeline and send the resulting feedback to teleoperator hardware.
Args:
feedback (dict[str, Any]): Dictionary representing the desired feedback. Its structure should match
:pymeth:`feedback_features`.
feedback (dict[str, Any]): Dictionary representing the desired feedback.
Its structure should match :pymeth:`feedback_features`.
"""
transformed = self.input_pipeline()(feedback)
self._send_feedback(transformed)
Returns:
dict[str, Any]: The action actually sent to the motors potentially clipped or modified, e.g. by
safety limits on velocity.
@abc.abstractmethod
def _send_feedback(self, feedback: dict[str, Any]) -> None:
"""
Send feedback directly to teleoperator hardware.
Args:
feedback (dict[str, Any]): Dictionary of hardware-level feedback commands.
"""
pass

View File

@@ -72,11 +72,11 @@ class UnitreeG1Teleoperator(Teleoperator):
self.ik_helper: ExoskeletonIKHelper | None = None
@cached_property
def action_features(self) -> dict[str, type]:
def raw_action_features(self) -> dict[str, type]:
return {f"{name}.q": float for name in self._g1_joint_names}
@cached_property
def feedback_features(self) -> dict[str, type]:
def raw_feedback_features(self) -> dict[str, type]:
return {}
@property
@@ -114,12 +114,12 @@ class UnitreeG1Teleoperator(Teleoperator):
def configure(self) -> None:
pass
def get_action(self) -> dict[str, float]:
def _get_action(self) -> dict[str, float]:
left_angles = self.left_arm.get_angles()
right_angles = self.right_arm.get_angles()
return self.ik_helper.compute_g1_joints_from_exo(left_angles, right_angles)
def send_feedback(self, feedback: dict[str, float]) -> None:
def _send_feedback(self, feedback: dict[str, float]) -> None:
raise NotImplementedError("Exoskeleton arms do not support feedback")
def disconnect(self) -> None:

View File

@@ -95,6 +95,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> "Teleoperator":
from .bi_openarm_leader import BiOpenArmLeader
return BiOpenArmLeader(config)
elif config.type == "openarm_mini":
from .openarm_mini import OpenArmMini
return OpenArmMini(config)
else:
try:
return cast("Teleoperator", make_device_from_device_class(config))

View File

@@ -189,7 +189,7 @@ def sanity_check_dataset_name(repo_id, policy_cfg):
# Check if dataset_name starts with "eval_" but policy is missing
if dataset_name.startswith("eval_") and policy_cfg is None:
raise ValueError(
f"Your dataset name begins with 'eval_' ({dataset_name}), but no policy is provided ({policy_cfg.type})."
f"Your dataset name begins with 'eval_' ({dataset_name}), but no policy is provided."
)
# Check if dataset_name does not start with "eval_" but policy is provided

View File

@@ -104,9 +104,10 @@ class MetricsTracker:
self.metrics = metrics
self.steps = initial_step
world_size = accelerator.num_processes if accelerator else 1
# A sample is an (observation,action) pair, where observation and action
# can be on multiple timestamps. In a batch, we have `batch_size` number of samples.
self.samples = self.steps * self._batch_size
self.samples = self.steps * self._batch_size * world_size
self.episodes = self.samples / self._avg_samples_per_ep
self.epochs = self.samples / self._num_frames
self.accelerator = accelerator
@@ -132,7 +133,8 @@ class MetricsTracker:
Updates metrics that depend on 'step' for one step.
"""
self.steps += 1
self.samples += self._batch_size * (self.accelerator.num_processes if self.accelerator else 1)
world_size = self.accelerator.num_processes if self.accelerator else 1
self.samples += self._batch_size * world_size
self.episodes = self.samples / self._avg_samples_per_ep
self.epochs = self.samples / self._num_frames

View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utilities for building dataset features from robot/teleoperator pipelines and for
checking action/observation space compatibility between teleops and robots.
"""
import logging
import re
from collections.abc import Sequence
from lerobot.datasets.utils import combine_feature_dicts, hw_to_dataset_features
from lerobot.utils.constants import ACTION, OBS_IMAGES, OBS_STATE, OBS_STR
# Prefixes stripped from feature keys to produce clean dataset names.
# Handles both fully-qualified (e.g. "observation.state.ee.x") and short (e.g. "state.ee.x") forms.
_PREFIXES_TO_STRIP = tuple(
f"{token}."
for const in (ACTION, OBS_STATE, OBS_IMAGES)
for token in (const, const.split(".")[-1])
)
_IMAGES_TOKEN = OBS_IMAGES.split(".")[-1]
def _should_keep(key: str, patterns: Sequence[str] | None) -> bool:
if patterns is None:
return True
return any(re.search(pat, key) for pat in patterns)
def _strip_prefix(key: str) -> str:
for prefix in _PREFIXES_TO_STRIP:
if key.startswith(prefix):
return key[len(prefix) :]
return key
def _features_to_dataset_spec(
features: dict,
*,
is_action: bool,
use_videos: bool,
patterns: Sequence[str] | None = None,
) -> dict:
"""
Convert a flat feature dict (as returned by ``robot.observation_features`` or
``teleop.action_features``) into a LeRobot dataset feature specification.
Args:
features: Flat dict mapping feature key → type or shape.
is_action: True when ``features`` describes actions; False for observations.
use_videos: When False, image observation features are excluded entirely.
patterns: Optional regex patterns to filter state/action features.
Image features are not affected by this filter.
Returns:
A dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
"""
categorized: dict = {}
for key, value in features.items():
is_image = not is_action and (
(isinstance(value, tuple) and len(value) == 3)
or key.startswith(f"{OBS_IMAGES}.")
or key.startswith(f"{_IMAGES_TOKEN}.")
or f".{_IMAGES_TOKEN}." in key
)
if is_image and not use_videos:
continue
if not is_image and not _should_keep(key, patterns):
continue
categorized[_strip_prefix(key)] = value
if not categorized:
return {}
prefix = ACTION if is_action else OBS_STR
return hw_to_dataset_features(categorized, prefix, use_videos)
def build_dataset_features(
robot,
teleop=None,
*,
use_videos: bool = True,
action_features: dict | None = None,
) -> dict:
"""
Derive dataset feature specifications from robot and teleoperator pipelines.
Reads ``robot.observation_features`` (which already reflects the robot's output
pipeline transformation) and, when provided, ``teleop.action_features`` or an
explicit ``action_features`` dict to determine what the dataset will store.
This replaces the old pattern of manually calling ``aggregate_pipeline_dataset_features``
with explicit processor objects.
Args:
robot: The robot instance (must have ``observation_features``).
teleop: The teleoperator instance. When ``None`` and ``action_features`` is also
``None`` (policy-only recording), only observation features are returned.
use_videos: If True, image observations are included as video features.
action_features: Explicit action feature dict, used when no teleop is available
(e.g. evaluate/inference mode) but the dataset must match a specific action
space (e.g. EE coordinates from a previously recorded dataset).
Returns:
A combined feature dict suitable for passing to ``LeRobotDataset.create(..., features=...)``.
Example::
# Teleop recording
features = build_dataset_features(follower, leader, use_videos=True)
# Policy-only recording (no teleop)
features = build_dataset_features(robot, use_videos=True)
# Evaluate with explicit EE action space
features = build_dataset_features(
robot,
use_videos=True,
action_features={
f"ee.{k}": PolicyFeature(type=FeatureType.ACTION, shape=(1,))
for k in ["x", "y", "z", "wx", "wy", "wz", "gripper_pos"]
},
)
"""
obs_ds = _features_to_dataset_spec(robot.observation_features, is_action=False, use_videos=use_videos)
if action_features is not None:
act_ds = _features_to_dataset_spec(action_features, is_action=True, use_videos=False)
elif teleop is not None:
act_ds = _features_to_dataset_spec(teleop.action_features, is_action=True, use_videos=False)
else:
return obs_ds
return combine_feature_dicts(act_ds, obs_ds)
def check_action_space_compatibility(teleop, robot) -> None:
"""
Warn if the teleoperator's pipeline-transformed action features don't match the robot's
declared ``action_features``.
This is a soft check — a mismatch produces a warning but does not raise. It is intended
to catch obvious misconfigurations (e.g., sending EE actions to a robot expecting joints)
before the control loop starts.
Args:
teleop: The teleoperator whose ``action_features`` describe what it sends.
robot: The robot whose ``action_features`` describe what it expects.
"""
teleop_out = set(teleop.action_features.keys())
robot_in = set(robot.action_features.keys())
if teleop_out != robot_in:
import warnings
warnings.warn(
f"Action space mismatch between teleop and robot.\n"
f" Teleop sends: {sorted(teleop_out)}\n"
f" Robot expects: {sorted(robot_in)}\n"
"Ensure pipelines map between these spaces correctly.",
UserWarning,
stacklevel=2,
)
else:
logging.debug("Action space compatibility check passed.")
def check_observation_space_compatibility(robot, teleop) -> None:
"""
Warn if the robot's observation features don't cover what the teleoperator's
``feedback_features`` expects.
A non-empty ``feedback_features`` that is not a subset of the robot's observation keys
will produce a warning.
Args:
robot: The robot whose ``observation_features`` describe what it produces.
teleop: The teleoperator whose ``feedback_features`` describe what it expects as feedback.
"""
robot_obs = set(robot.observation_features.keys())
teleop_feedback = set(teleop.feedback_features.keys())
if teleop_feedback and not teleop_feedback.issubset(robot_obs):
import warnings
warnings.warn(
f"Observation/feedback space mismatch.\n"
f" Robot obs: {sorted(robot_obs)}\n"
f" Teleop feedback expects: {sorted(teleop_feedback)}\n"
"Ensure the robot observation pipeline covers all feedback keys.",
UserWarning,
stacklevel=2,
)
else:
logging.debug("Observation/feedback space compatibility check passed.")

View File

@@ -31,7 +31,6 @@ from lerobot.configs.train import TrainPipelineConfig
from lerobot.datasets.factory import make_dataset
from lerobot.datasets.image_writer import image_array_to_pil_image
from lerobot.datasets.lerobot_dataset import (
VALID_VIDEO_CODECS,
LeRobotDataset,
MultiLeRobotDataset,
_encode_video_worker,
@@ -45,6 +44,7 @@ from lerobot.datasets.utils import (
hf_transform_to_torch,
hw_to_dataset_features,
)
from lerobot.datasets.video_utils import VALID_VIDEO_CODECS
from lerobot.envs.factory import make_env_config
from lerobot.policies.factory import make_policy_config
from lerobot.robots import make_robot_from_config
@@ -393,7 +393,7 @@ def test_tmp_mixed_deletion(tmp_path, empty_lerobot_dataset_factory):
vid_key: {"dtype": "video", "shape": DUMMY_HWC, "names": ["height", "width", "channels"]},
}
ds_mixed = empty_lerobot_dataset_factory(
root=tmp_path / "mixed", features=features_mixed, batch_encoding_size=2
root=tmp_path / "mixed", features=features_mixed, batch_encoding_size=2, streaming_encoding=False
)
ds_mixed.add_frame(
{
@@ -1450,7 +1450,10 @@ def test_valid_video_codecs_constant():
assert "h264" in VALID_VIDEO_CODECS
assert "hevc" in VALID_VIDEO_CODECS
assert "libsvtav1" in VALID_VIDEO_CODECS
assert len(VALID_VIDEO_CODECS) == 3
assert "auto" in VALID_VIDEO_CODECS
assert "h264_videotoolbox" in VALID_VIDEO_CODECS
assert "h264_nvenc" in VALID_VIDEO_CODECS
assert len(VALID_VIDEO_CODECS) == 10
def test_delta_timestamps_with_episodes_filter(tmp_path, empty_lerobot_dataset_factory):

Some files were not shown because too many files have changed in this diff Show More