Files
lerobot-clone/tests/annotations/test_writer.py
Pepijn eba3ab3741 annotate: address review feedback — bug fixes, docs/code drift, naming, cleanup
Bugs
  * validator: don't re-raise on unknown style. The second column_for_style
    lookup (used to route persistent vs event) now sits in try/except so an
    unknown style is recorded by _check_column_routing and skipped instead
    of crashing the whole validation pass.
  * general_vqa._target_cameras: when restrict_to_default_camera is set but
    the configured camera_key isn't one the provider exposes, warn and fall
    back to all cameras instead of returning a phantom key that KeyErrors
    deep in frame decode.
  * interjections: clamp interjection timestamps to frame_timestamps[0]
    rather than a hardcoded 0.0 (datasets can start at non-zero t).

Docs / code drift
  * annotation_pipeline.mdx: drop the phantom 'vocabulary discovery / phase
    0 / --vocabulary.* / canonical_vocabulary.json' section (none of it
    exists); describe the real describe->segment + coverage-stitch flow.
    Soften the src/lerobot/tools/ + TOOL_REGISTRY reference to 'not part of
    this PR' (matches tools.mdx, which already marks the runtime layer as
    not-yet-implemented). Fix the --push_to_hub/--new_repo_id wording. Note
    the default is now a single h200. Add a 'Contributing new modules'
    section inviting module / prompt / quality contributions.
  * executor docstring: six phases, no phantom phase 0.

run_hf_job.py
  * add the Apache 2.0 license header (was flagged repeatedly).
  * default to a single GPU: flavor=h200, parallel_servers=1, num_gpus=1
    (scale to h200x4 noted in the docstring).
  * pin the install to @main instead of the feature branch (won't break
    after merge).

Naming / cleanup
  * rename dest_repo_id -> new_repo_id across config / script / example /
    test to match the LeRobot dataset edit tools.
  * rename prompt templates module_N_*.txt -> descriptive (plan_*,
    interjections_*, vqa.txt) and update every load_prompt() call.
  * remove dead _messages_to_prompt (used only by the removed in-process
    backends).
  * declare _warned_decode_fail (frames) and _warned_no_camera (vqa) as
    real init=False dataclass fields instead of getattr monkey-patches.
  * scope bandit B607 to the two ffmpeg subprocess.run sites via
    '# nosec B607' and drop it from the global skip list.

Tests
  * fix stale canned-VLM markers ('ONE realistic interruption' ->
    'compact interjection', 'Update the memory' -> 'compressed semantic
    memory') and drop the dead 'concise hierarchical PLAN' plan responders
    (plan generation is deterministic now) in run_e2e_smoke,
    test_pipeline_recipe_render, test_modules.
  * run_e2e_smoke now asserts interjection + speech rows are produced so a
    stale marker can't silently pass again.
  * drop remaining 'PR 1' / 'PR 2' references from test comments / names.

Verified: tests/annotations + tests/datasets/test_language +
tests/scripts/test_lerobot_annotate (31 passed); make-style E2E smoke
(interjections=1 speech_atoms=2); pre-commit (ruff, mypy, bandit,
prettier) clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-03 18:30:46 +02:00

358 lines
13 KiB
Python

#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Writer correctness tests."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
# ``pyarrow`` and the ``lerobot.annotations`` -> ``lerobot.datasets`` chain
# (-> the HF ``datasets`` library) only ship under the ``dataset`` extra.
# Skip this module in tiers without it instead of erroring at import.
pytest.importorskip("datasets", reason="datasets is required (install lerobot[dataset])")
pytest.importorskip("pandas", reason="pandas is required (install lerobot[dataset])")
import pyarrow.parquet as pq # noqa: E402
from lerobot.annotations.steerable_pipeline.reader import iter_episodes # noqa: E402
from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging # noqa: E402
from lerobot.annotations.steerable_pipeline.writer import ( # noqa: E402
LanguageColumnsWriter,
speech_atom,
)
def _stage_episode(
staging_dir: Path,
episode_index: int,
*,
plan: list[dict] | None = None,
interjections: list[dict] | None = None,
vqa: list[dict] | None = None,
) -> None:
staging = EpisodeStaging(staging_dir, episode_index)
if plan is not None:
staging.write("plan", plan)
if interjections is not None:
staging.write("interjections", interjections)
if vqa is not None:
staging.write("vqa", vqa)
def test_writer_persistence_identity(fixture_dataset_root: Path, tmp_path: Path) -> None:
"""Every frame in an episode has a byte-identical persistent list."""
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
plan=[
{
"role": "assistant",
"content": "grasp the sponge",
"style": "subtask",
"timestamp": 0.0,
"tool_calls": None,
},
{
"role": "assistant",
"content": "1. wipe\n2. dry",
"style": "plan",
"timestamp": 0.0,
"tool_calls": None,
},
{
"role": "assistant",
"content": "wiped the counter",
"style": "memory",
"timestamp": 0.5,
"tool_calls": None,
},
],
)
records = list(iter_episodes(fixture_dataset_root))
LanguageColumnsWriter().write_all(records, staging_dir, fixture_dataset_root)
table = pq.read_table(fixture_dataset_root / "data" / "chunk-000" / "file-000.parquet")
persistent = table.column("language_persistent").to_pylist()
first = persistent[0]
assert first # non-empty
for row in persistent:
assert row == first, "persistent slice must be byte-identical across all frames"
def test_writer_events_exact_timestamp(fixture_dataset_root: Path, tmp_path: Path) -> None:
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
interjections=[
speech_atom(0.0, "Got it."),
{
"role": "user",
"content": "skip the dishes",
"style": "interjection",
"timestamp": 0.5,
"tool_calls": None,
},
speech_atom(0.5, "Skipping the dishes."),
],
)
records = list(iter_episodes(fixture_dataset_root))
LanguageColumnsWriter().write_all(records, staging_dir, fixture_dataset_root)
table = pq.read_table(fixture_dataset_root / "data" / "chunk-000" / "file-000.parquet")
timestamps = table.column("timestamp").to_pylist()
events = table.column("language_events").to_pylist()
for ts, ev in zip(timestamps, events, strict=True):
if abs(ts - 0.0) < 1e-9:
assert any(r["role"] == "assistant" and r.get("style") is None for r in ev), ev
elif abs(ts - 0.5) < 1e-9:
assert any(r.get("style") == "interjection" for r in ev), ev
assert any(r.get("style") is None for r in ev), ev
else:
assert ev == []
def test_writer_column_routing(fixture_dataset_root: Path, tmp_path: Path) -> None:
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
plan=[
{
"role": "assistant",
"content": "do X",
"style": "subtask",
"timestamp": 0.0,
"tool_calls": None,
},
{
"role": "assistant",
"content": "1. do X",
"style": "plan",
"timestamp": 0.0,
"tool_calls": None,
},
{
"role": "assistant",
"content": "did X",
"style": "memory",
"timestamp": 0.3,
"tool_calls": None,
},
],
interjections=[
speech_atom(0.0, "OK"),
{
"role": "user",
"content": "wait",
"style": "interjection",
"timestamp": 0.2,
"tool_calls": None,
},
speech_atom(0.2, "Waiting"),
],
vqa=[
{
"role": "user",
"content": "where is the cup?",
"style": "vqa",
"timestamp": 0.4,
"camera": "observation.images.front",
"tool_calls": None,
},
{
"role": "assistant",
"content": json.dumps(
{"detections": [{"label": "cup", "bbox_format": "xyxy", "bbox": [1, 2, 3, 4]}]},
sort_keys=True,
),
"style": "vqa",
"timestamp": 0.4,
"camera": "observation.images.front",
"tool_calls": None,
},
],
)
records = list(iter_episodes(fixture_dataset_root))
LanguageColumnsWriter().write_all(records, staging_dir, fixture_dataset_root)
table = pq.read_table(fixture_dataset_root / "data" / "chunk-000" / "file-000.parquet")
persistent = table.column("language_persistent").to_pylist()[0]
persistent_styles = {r["style"] for r in persistent}
assert persistent_styles == {"subtask", "plan", "memory"}
all_events = [r for ev in table.column("language_events").to_pylist() for r in ev]
event_styles = {r.get("style") for r in all_events}
assert event_styles == {None, "interjection", "vqa"}
def test_writer_drops_subtask_index_idempotent(fixture_dataset_root: Path, tmp_path: Path) -> None:
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
plan=[
{
"role": "assistant",
"content": "do X",
"style": "subtask",
"timestamp": 0.0,
"tool_calls": None,
},
],
)
records = list(iter_episodes(fixture_dataset_root))
writer = LanguageColumnsWriter()
writer.write_all(records, staging_dir, fixture_dataset_root)
path = fixture_dataset_root / "data" / "chunk-000" / "file-000.parquet"
table_a = pq.read_table(path)
assert "subtask_index" not in table_a.column_names
assert "language_persistent" in table_a.column_names
assert "language_events" in table_a.column_names
# The writer no longer emits a dataset-level ``tools`` column; the
# ``say`` tool schema lives as a code constant (``SAY_TOOL_SCHEMA``)
# so the parquet stays small and the pipeline doesn't extend the schema.
assert "tools" not in table_a.column_names
# second pass — must produce identical bytes for the language columns
records_again = list(iter_episodes(fixture_dataset_root))
writer.write_all(records_again, staging_dir, fixture_dataset_root)
table_b = pq.read_table(path)
assert (
table_a.column("language_persistent").to_pylist() == table_b.column("language_persistent").to_pylist()
)
assert table_a.column("language_events").to_pylist() == table_b.column("language_events").to_pylist()
def test_writer_normalize_rejects_misrouted_persistent_style() -> None:
"""``_normalize_persistent_row`` must reject any non-persistent style."""
from lerobot.annotations.steerable_pipeline.writer import _normalize_persistent_row
with pytest.raises(ValueError, match="non-persistent style"):
_normalize_persistent_row(
{"role": "assistant", "content": "oops", "style": "vqa", "timestamp": 0.0, "tool_calls": None}
)
def test_writer_normalize_rejects_misrouted_event_style() -> None:
"""``_normalize_event_row`` must reject any persistent style."""
from lerobot.annotations.steerable_pipeline.writer import _normalize_event_row
with pytest.raises(ValueError):
_normalize_event_row({"role": "assistant", "content": "oops", "style": "subtask", "tool_calls": None})
def test_say_tool_schema_constant_is_well_formed() -> None:
"""``SAY_TOOL_SCHEMA`` (and ``DEFAULT_TOOLS``) replace the parquet
``tools`` column — chat-template consumers import them directly.
"""
from lerobot.annotations.steerable_pipeline.writer import (
DEFAULT_TOOLS,
SAY_TOOL_SCHEMA,
)
assert DEFAULT_TOOLS == [SAY_TOOL_SCHEMA]
assert SAY_TOOL_SCHEMA["function"]["name"] == "say"
params = SAY_TOOL_SCHEMA["function"]["parameters"]
assert params["properties"]["text"]["type"] == "string"
assert params["required"] == ["text"]
def test_writer_does_not_add_tools_column(fixture_dataset_root: Path, tmp_path: Path) -> None:
"""Re-running on a parquet that already has a legacy ``tools`` column
must drop it cleanly so reruns converge to the v3.1 schema.
"""
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
plan=[
{"role": "assistant", "content": "x", "style": "subtask", "timestamp": 0.0, "tool_calls": None}
],
)
records = list(iter_episodes(fixture_dataset_root))
LanguageColumnsWriter().write_all(records, staging_dir, fixture_dataset_root)
table = pq.read_table(fixture_dataset_root / "data" / "chunk-000" / "file-000.parquet")
assert "tools" not in table.column_names
def test_annotation_metadata_sync_allows_non_streaming_load(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""Annotated parquet columns must be declared in ``meta/info.json``.
``LeRobotDataset`` loads non-streaming datasets by casting parquet
against metadata-derived HF features. If the annotation writer adds
language columns but metadata stays stale, that cast fails with a column
mismatch.
"""
from lerobot.annotations.steerable_pipeline.executor import Executor
from lerobot.datasets.feature_utils import get_hf_features_from_features
from lerobot.datasets.io_utils import load_info, load_nested_dataset
from lerobot.datasets.language import LANGUAGE_EVENTS, LANGUAGE_PERSISTENT, language_feature_info
info_path = fixture_dataset_root / "meta" / "info.json"
info = json.loads(info_path.read_text())
info["features"] = {
"episode_index": {"dtype": "int64", "shape": (1,), "names": None},
"frame_index": {"dtype": "int64", "shape": (1,), "names": None},
"timestamp": {"dtype": "float32", "shape": (1,), "names": None},
"task_index": {"dtype": "int64", "shape": (1,), "names": None},
}
info_path.write_text(json.dumps(info, indent=2))
staging_dir = tmp_path / "stage"
_stage_episode(
staging_dir,
0,
plan=[
{"role": "assistant", "content": "do X", "style": "subtask", "timestamp": 0.0, "tool_calls": None}
],
)
records = list(iter_episodes(fixture_dataset_root))
LanguageColumnsWriter().write_all(records, staging_dir, fixture_dataset_root)
Executor._ensure_annotation_metadata_in_info(fixture_dataset_root)
synced = load_info(fixture_dataset_root)
for key, feature in language_feature_info().items():
assert synced["features"][key] == feature
hf_features = get_hf_features_from_features(synced["features"])
dataset = load_nested_dataset(fixture_dataset_root / "data", features=hf_features)
assert LANGUAGE_PERSISTENT in dataset.column_names
assert LANGUAGE_EVENTS in dataset.column_names
assert len(dataset) == 24
def test_speech_atom_shape_matches_plan_spec() -> None:
atom = speech_atom(2.5, "I'm cleaning up!")
assert atom["role"] == "assistant"
assert atom["style"] is None
assert atom["content"] is None
assert atom["timestamp"] == 2.5
assert isinstance(atom["tool_calls"], list)
call = atom["tool_calls"][0]
assert call["type"] == "function"
assert call["function"]["name"] == "say"
assert call["function"]["arguments"]["text"] == "I'm cleaning up!"