diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index 7fd27b1f2..98ef79fb9 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -7,8 +7,7 @@ ## What the pipeline produces -A vocabulary-discovery phase derives a small canonical wording, then three -modules write into a per-episode staging tree, then a single writer +Three modules write into a per-episode staging tree, then a single writer rewrites the data shards in place: | Style / atom | Column | Module | @@ -21,20 +20,15 @@ rewrites the data shards in place: | speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections` | | `vqa` (user / assistant pair) | `language_events` | `vqa` | -The `plan` module is constrained to a **canonical vocabulary** discovered -once per dataset by the `vocabulary` module (phase 0). It watches a few -sample episode videos (`--vocabulary.sample_episodes`, default `3`) and -asks the VLM to derive a small set of imperative subtask labels and -first-person memory milestones that recur across the demos. The VLM -picks the right number of entries itself based on what it sees in the -clips — short pick-and-place demos get ~6 subtask labels, longer -multi-step recipes get more. The result lands at -`meta/canonical_vocabulary.json` (human-readable / hand-editable) and -is reused on every subsequent run. The `plan` module then constrains -both subtask + memory generation to those exact strings — the -downstream low-level policy sees a small, repeatable target -distribution instead of thousands of LLM paraphrases. Disable with -`--vocabulary.enabled=False` to fall back to free-form generation. +The `plan` module generates subtasks per episode with a **describe → segment** +grounding flow: a first pass narrates only what is visible in the chosen +camera, and its description is fed into a second pass that segments the +episode into consecutive atomic subtasks. The resulting spans are then +deterministically stitched into a contiguous full-episode cover so every +frame has exactly one active subtask. See +[`run_hf_job.py`](https://github.com/huggingface/lerobot/blob/main/examples/annotations/run_hf_job.py) +for the production flag set (single camera, embedded frames, windowed +subtask generation). The writer does **not** add a `tools` column to the parquet — the tool catalog lives at `meta/info.json["tools"]` instead (see @@ -44,9 +38,11 @@ user pre-declared. If you want to declare additional tools for a dataset before annotation runs, edit `meta/info.json["tools"]` directly — the pipeline preserves -anything already there. Implementations of those tools live under -`src/lerobot/tools/`; one file per tool, registered via -`TOOL_REGISTRY`. See the [Tools](./tools) doc for the authoring guide. +anything already there. That makes the tool visible to the chat template +so the model can learn to _generate_ the call. The runtime layer that +_executes_ a generated call (the `Tool` protocol / `TOOL_REGISTRY` under +`src/lerobot/tools/`) is not part of this PR — see the +[Tools](./tools) doc, which marks those pieces as not-yet-implemented. ## Running on Hugging Face Jobs @@ -59,19 +55,33 @@ HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py ``` [`examples/annotations/run_hf_job.py`](https://github.com/huggingface/lerobot/blob/main/examples/annotations/run_hf_job.py) -spawns a multi-GPU `h200` job that: +spawns a single-GPU `h200` job (scale up to `h200x4` for larger datasets) that: 1. installs the branch under test plus the annotation extras, 2. boots one vLLM server per GPU (in the `vllm/vllm-openai` image) for the chosen model, which the pipeline drives over the OpenAI-compatible API, 3. runs the `plan` / `interjections` / `vqa` modules across the dataset via `lerobot-annotate`, -4. uploads the annotated dataset to `--push_to_hub`. +4. with `--push_to_hub=true`, uploads the annotated dataset to + `--new_repo_id` (or back to `--repo_id` in place when that is unset). To target a different dataset, model, or hub repo, edit the `CMD` block inside the script — every flag in there maps directly onto a CLI flag of `lerobot-annotate` (see `lerobot-annotate --help` for the full list). +## Contributing new modules + +The pipeline is built to be extended, and **contributions are very +welcome** — whether that's a brand-new annotation module (e.g. a +trajectory-trace or affordance module), a new prompt template, a better +grounding flow, or quality improvements to the existing `plan` / +`interjections` / `vqa` modules. Each module lives under +`src/lerobot/annotations/steerable_pipeline/modules/`, shares the VLM +client and keyframe cache, writes its raw output to the per-episode +staging tree, and is wired into the executor as an independent phase. +If you have an idea for a module or an improvement, open an issue or PR +on [the repo](https://github.com/huggingface/lerobot). + ## Style-to-recipe consumer mapping The pipeline's outputs are designed to be consumed by recipes (see diff --git a/examples/annotations/run_hf_job.py b/examples/annotations/run_hf_job.py index 6af40a268..c335379f4 100644 --- a/examples/annotations/run_hf_job.py +++ b/examples/annotations/run_hf_job.py @@ -1,21 +1,37 @@ #!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6-27B VLM). -Spawns one ``h200x4`` job that: +Spawns one single-GPU ``h200`` job that: - 1. installs this branch of ``lerobot`` plus the annotation extras, - 2. boots four vllm servers (one per GPU) with Qwen3.6-27B (dense VLM), + 1. installs ``lerobot`` plus the annotation extras, + 2. boots one vllm server with Qwen3.6-27B (dense VLM), 3. runs the plan / interjections / vqa modules across the dataset in free-form mode (each episode generates its own subtasks + memory), - 4. uploads the annotated dataset to ``--dest_repo_id`` (when set) + 4. uploads the annotated dataset to ``--new_repo_id`` (when set) or back to ``--repo_id``. Usage: HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py -Adjust ``CMD`` below to point at your own dataset / target hub repo. +Adjust ``CMD`` (dataset, model, hub repo) and ``flavor`` below for your +run. For larger datasets, scale to ``h200x4`` and raise +``--vlm.parallel_servers`` / ``--vlm.num_gpus`` to match. """ import os @@ -29,7 +45,7 @@ if not token: CMD = ( "apt-get update -qq && apt-get install -y -qq git ffmpeg && " "pip install --no-deps " - "'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && " + "'lerobot @ git+https://github.com/huggingface/lerobot.git@main' && " "pip install --upgrade-strategy only-if-needed " "datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect " "openai && " @@ -37,12 +53,12 @@ CMD = ( "export VLLM_VIDEO_BACKEND=pyav && " "lerobot-annotate " "--repo_id=pepijn223/robocasa_pretrain_human300_v4 " - "--dest_repo_id=pepijn223/robocasa_pretrain_human300_v4_annotated5 " + "--new_repo_id=pepijn223/robocasa_pretrain_human300_v4_annotated5 " "--push_to_hub=true " "--vlm.backend=openai " "--vlm.model_id=Qwen/Qwen3.6-27B " - "--vlm.parallel_servers=4 " - "--vlm.num_gpus=4 " + "--vlm.parallel_servers=1 " + "--vlm.num_gpus=1 " '--vlm.serve_command="vllm serve Qwen/Qwen3.6-27B ' "--tensor-parallel-size 1 --max-model-len 32768 " '--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" ' @@ -111,7 +127,7 @@ CMD = ( job = run_job( image="vllm/vllm-openai:latest", command=["bash", "-c", CMD], - flavor="h200x4", + flavor="h200", secrets={"HF_TOKEN": token}, timeout="2h", ) diff --git a/pyproject.toml b/pyproject.toml index 86599aa31..dce61758c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -417,7 +417,7 @@ exclude_dirs = [ "benchmarks", "src/lerobot/datasets/push_dataset_to_hub", ] -skips = ["B101", "B311", "B404", "B603", "B607", "B615"] +skips = ["B101", "B311", "B404", "B603", "B615"] [tool.typos] default.extend-ignore-re = [ diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index 470dccfc1..10484fd3a 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -370,13 +370,14 @@ class AnnotationPipelineConfig: # Hub dataset id. Used as the download source when ``root`` is unset, # and as the destination repo when ``push_to_hub`` is enabled and - # ``dest_repo_id`` is unset. + # ``new_repo_id`` is unset. repo_id: str | None = None - # Optional separate Hub dataset id to push the annotated result to. When - # unset, ``push_to_hub`` uploads back to ``repo_id`` (annotate in place); - # when set, the source ``repo_id`` is left untouched. - dest_repo_id: str | None = None + # Optional separate Hub dataset id to push the annotated result to (named + # ``new_repo_id`` to match the LeRobot dataset edit tools). When unset, + # ``push_to_hub`` uploads back to ``repo_id`` (annotate in place); when + # set, the source ``repo_id`` is left untouched. + new_repo_id: str | None = None root: Path | None = None @@ -404,7 +405,7 @@ class AnnotationPipelineConfig: video_backend: str | None = None # When True, upload the annotated dataset to the Hugging Face Hub: - # to ``dest_repo_id`` if set, otherwise back to ``repo_id``. One of + # to ``new_repo_id`` if set, otherwise back to ``repo_id``. One of # the two must be set for this to take effect. push_to_hub: bool = False push_private: bool = False diff --git a/src/lerobot/annotations/steerable_pipeline/executor.py b/src/lerobot/annotations/steerable_pipeline/executor.py index 4b7eb687d..69d10bc89 100644 --- a/src/lerobot/annotations/steerable_pipeline/executor.py +++ b/src/lerobot/annotations/steerable_pipeline/executor.py @@ -15,14 +15,8 @@ # limitations under the License. """In-process executor that runs the annotation phases. -The executor plans **seven phases** in the dependency order from the plan: +The executor runs **six phases** in dependency order: - phase 0: vocabulary discovery — derive a small canonical vocabulary - from the first few sample-episode videos (subtask labels + - memory milestones) and persist it next to the dataset; the - ``plan`` module then constrains every per-episode generation - to those strings, so the downstream policy sees a small, - repeatable conditioning distribution phase 1: ``plan`` module (plan + subtasks + memory) phase 2: ``interjections`` module (interjections + speech) phase 3: ``plan`` plan-update pass — re-runs plan emission at every diff --git a/src/lerobot/annotations/steerable_pipeline/frames.py b/src/lerobot/annotations/steerable_pipeline/frames.py index 804dae109..a26245964 100644 --- a/src/lerobot/annotations/steerable_pipeline/frames.py +++ b/src/lerobot/annotations/steerable_pipeline/frames.py @@ -146,6 +146,7 @@ class VideoFrameProvider: # ``ExecutorConfig.episode_parallelism``); guard the dict cache and the # one-shot warn flag against concurrent updates from worker threads. _lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False) + _warned_decode_fail: bool = field(default=False, init=False, repr=False) def __post_init__(self) -> None: from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata # noqa: PLC0415 @@ -285,7 +286,9 @@ class VideoFrameProvider: str(out_path), ] try: - subprocess.run(cmd, check=True, timeout=300) + # ffmpeg is invoked by name via PATH lookup (the standard way to + # call the CLI); the arg list is fully controlled here, not shell. + subprocess.run(cmd, check=True, timeout=300) # nosec B607 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError): return None return out_path if out_path.exists() and out_path.stat().st_size > 0 else None @@ -335,7 +338,7 @@ class VideoFrameProvider: # []) is debuggable from the job log instead of post-hoc parquet # inspection. Subsequent failures stay quiet. with self._lock: - already_warned = getattr(self, "_warned_decode_fail", False) + already_warned = self._warned_decode_fail if not already_warned: self._warned_decode_fail = True if not already_warned: @@ -382,7 +385,8 @@ def _decode_frames_ffmpeg(video_path: Path, timestamps: list[float]) -> list[Any frames: list[Any] = [] for ts in timestamps: - proc = subprocess.run( + # ffmpeg invoked by name via PATH lookup; fully-controlled arg list, no shell. + proc = subprocess.run( # nosec B607 [ "ffmpeg", "-nostdin", diff --git a/src/lerobot/annotations/steerable_pipeline/modules/general_vqa.py b/src/lerobot/annotations/steerable_pipeline/modules/general_vqa.py index 579007912..cdc87b579 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/general_vqa.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/general_vqa.py @@ -95,6 +95,7 @@ class GeneralVqaModule: config: VqaConfig seed: int = 1729 frame_provider: FrameProvider = field(default_factory=null_provider) + _warned_no_camera: bool = field(default=False, init=False, repr=False) @property def enabled(self) -> bool: @@ -113,7 +114,7 @@ class GeneralVqaModule: # No camera available — emit nothing rather than producing # untagged rows that would fail validation. Surface a loud one- # time warning so this is never silently a no-op. - if not getattr(self, "_warned_no_camera", False): + if not self._warned_no_camera: logging.getLogger(__name__).warning( "vqa module found no cameras on the frame provider — " "every episode will emit zero VQA rows. Check that the " @@ -191,8 +192,17 @@ class GeneralVqaModule: default = getattr(self.frame_provider, "camera_key", None) if default and default in all_cameras: return [default] + # ``restrict_to_default_camera`` is set but the configured default + # isn't one the provider exposes. Returning it anyway would make + # ``_decode`` raise a KeyError deep in frame extraction, so warn and + # fall through to every available camera instead. if default: - return [default] + logging.getLogger(__name__).warning( + "restrict_to_default_camera is set but camera_key=%r is not in the " + "provider's cameras %s; grounding VQA on all available cameras instead.", + default, + all_cameras, + ) return all_cameras def _build_messages( @@ -202,7 +212,7 @@ class GeneralVqaModule: frame_timestamp: float, camera_key: str, ) -> list[dict[str, Any]]: - prompt = load_prompt("module_3_vqa").format( + prompt = load_prompt("vqa").format( episode_task=record.episode_task, question_type=question_type, ) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/interjections_and_speech.py b/src/lerobot/annotations/steerable_pipeline/modules/interjections_and_speech.py index f03e3df0d..616f9ce1b 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/interjections_and_speech.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/interjections_and_speech.py @@ -85,7 +85,7 @@ class InterjectionsAndSpeechModule: return current def _initial_speech(self, record: EpisodeRecord) -> str | None: - prompt = load_prompt("module_2_initial_speech").format( + prompt = load_prompt("interjections_initial_speech").format( episode_task=record.episode_task, ) messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}] @@ -147,7 +147,7 @@ class InterjectionsAndSpeechModule: # previous subtask and the start of the next one — same # conditioning the policy will see at training time. window_ts = self._window_timestamps(t_snap, record.frame_timestamps) - prompt = load_prompt("module_2_interjection").format( + prompt = load_prompt("interjections_interjection").format( episode_task=record.episode_task, prev_subtask=prev_subtask or "(starting from initial state)", next_subtask=next_subtask, @@ -198,11 +198,12 @@ class InterjectionsAndSpeechModule: # Center the window on the anchor so half lands before, half after. start_offset = -window / 2.0 targets = [t_anchor + start_offset + step * i for i in range(n)] + first_ts = float(frame_timestamps[0]) last_ts = float(frame_timestamps[-1]) snapped: list[float] = [] seen: set[float] = set() for tgt in targets: - clamped = min(last_ts, max(0.0, tgt)) + clamped = min(last_ts, max(first_ts, tgt)) t = snap_to_frame(clamped, frame_timestamps) if t not in seen: seen.add(t) diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index fecd42d3a..ac5c76453 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -285,14 +285,14 @@ class PlanSubtasksMemoryModule: def _derive_task_from_video(self, record: EpisodeRecord) -> str | None: """Ask the VLM "what is this video about" with no task hint at all.""" - text = self._vlm_field(self._video_message(record, load_prompt("module_1_video_task")), "task") + text = self._vlm_field(self._video_message(record, load_prompt("plan_video_task")), "task") return text.strip() if isinstance(text, str) and text.strip() else None def _generate_task_rephrasings(self, base_task: str, *, n: int) -> list[str]: """Generate ``n`` text-only paraphrases of ``base_task``.""" if n <= 0 or not base_task: return [] - prompt = load_prompt("module_1_task_rephrasings").format(base_task=base_task, n=n) + prompt = load_prompt("plan_task_rephrasings").format(base_task=base_task, n=n) raw = self._vlm_field(self._text_message(prompt), "rephrasings") if not isinstance(raw, list): return [] @@ -343,7 +343,7 @@ class PlanSubtasksMemoryModule: ) return None - prompt = load_prompt("module_1_action_record").format( + prompt = load_prompt("plan_action_record").format( episode_task=episode_task, subtask_text=span.get("text", ""), start_time=start_t, @@ -416,7 +416,7 @@ class PlanSubtasksMemoryModule: """ if not base_task: return [] - prompt = load_prompt("module_1_task_aug_axes").format( + prompt = load_prompt("plan_task_aug_axes").format( base_task=base_task, n_synonym=axes_cfg.synonym_paraphrase, n_omit_arm=axes_cfg.omit_arm, @@ -596,7 +596,7 @@ class PlanSubtasksMemoryModule: ) # ---- Pass 2: segmentation ------------------------------------ - prompt = load_prompt("module_1_subtasks").format( + prompt = load_prompt("plan_subtasks").format( episode_task=effective_task, min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, @@ -679,7 +679,7 @@ class PlanSubtasksMemoryModule: "action that is not in your description above.\n\n" ) - prompt = load_prompt("module_1_subtasks").format( + prompt = load_prompt("plan_subtasks").format( episode_task=task, min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, @@ -778,7 +778,7 @@ class PlanSubtasksMemoryModule: self, record: EpisodeRecord, task: str, window: tuple[float, float] | None = None ) -> str: """Grounding pass: free-form chronological description of the (windowed) video.""" - prompt = load_prompt("module_1_subtask_describe").format(episode_task=task) + prompt = load_prompt("plan_subtask_describe").format(episode_task=task) text = self._vlm_field(self._video_message(record, prompt, window=window), "description") return text.strip() if isinstance(text, str) and text.strip() else "" @@ -882,7 +882,7 @@ class PlanSubtasksMemoryModule: *, task: str | None = None, ) -> str: - prompt = load_prompt("module_1_memory").format( + prompt = load_prompt("plan_memory").format( episode_task=(task if task is not None else record.episode_task), prior_memory=prior_memory or "(none)", completed_subtask=completed, diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_2_initial_speech.txt b/src/lerobot/annotations/steerable_pipeline/prompts/interjections_initial_speech.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_2_initial_speech.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/interjections_initial_speech.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_2_interjection.txt b/src/lerobot/annotations/steerable_pipeline/prompts/interjections_interjection.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_2_interjection.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/interjections_interjection.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_action_record.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_action_record.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_memory.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_memory.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtask_describe.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_subtask_describe.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtask_describe.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_subtask_describe.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_subtasks.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_subtasks.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_task_aug_axes.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_task_aug_axes.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_task_rephrasings.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_rephrasings.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_task_rephrasings.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt b/src/lerobot/annotations/steerable_pipeline/prompts/plan_video_task.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_1_video_task.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/plan_video_task.txt diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_3_vqa.txt b/src/lerobot/annotations/steerable_pipeline/prompts/vqa.txt similarity index 100% rename from src/lerobot/annotations/steerable_pipeline/prompts/module_3_vqa.txt rename to src/lerobot/annotations/steerable_pipeline/prompts/vqa.txt diff --git a/src/lerobot/annotations/steerable_pipeline/validator.py b/src/lerobot/annotations/steerable_pipeline/validator.py index 203e3f157..f08074c9a 100644 --- a/src/lerobot/annotations/steerable_pipeline/validator.py +++ b/src/lerobot/annotations/steerable_pipeline/validator.py @@ -138,7 +138,13 @@ class StagingValidator: for row in all_rows: self._check_column_routing(row, report, record.episode_index) self._check_camera_field(row, report, record.episode_index, self.dataset_camera_keys) - if column_for_style(row.get("style")) == LANGUAGE_PERSISTENT: + # ``_check_column_routing`` already recorded any unknown-style error; + # don't let the same ``column_for_style`` lookup raise here uncaught. + try: + column = column_for_style(row.get("style")) + except ValueError: + continue + if column == LANGUAGE_PERSISTENT: persistent.append(row) else: events.append(row) diff --git a/src/lerobot/annotations/steerable_pipeline/vlm_client.py b/src/lerobot/annotations/steerable_pipeline/vlm_client.py index d0d9e56a9..d0ee10ad8 100644 --- a/src/lerobot/annotations/steerable_pipeline/vlm_client.py +++ b/src/lerobot/annotations/steerable_pipeline/vlm_client.py @@ -598,13 +598,3 @@ def _pil_to_data_url(image: Any) -> str: image.save(buf, format="PNG") b64 = base64.b64encode(buf.getvalue()).decode("ascii") return f"data:image/png;base64,{b64}" - - -def _messages_to_prompt(messages: Sequence[dict[str, Any]]) -> Any: - """Pass-through hook used by the vllm backend. - - vllm exposes its own multimodal entry points that vary by version; for the - base flow we simply forward the raw message list and let the caller's - custom backend handle templating. Real deployments override this. - """ - return list(messages) diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 4c18b7937..dc5e9013a 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -113,9 +113,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: logger.warning(w) if cfg.push_to_hub: - if cfg.repo_id is None and cfg.dest_repo_id is None: + if cfg.repo_id is None and cfg.new_repo_id is None: raise ValueError( - "--push_to_hub requires --repo_id or --dest_repo_id (the dataset repo to push to)." + "--push_to_hub requires --repo_id or --new_repo_id (the dataset repo to push to)." ) _push_to_hub(root, cfg) @@ -123,11 +123,11 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None: """Upload the annotated dataset directory to the Hub. - Pushes to ``cfg.dest_repo_id`` when set, otherwise back to ``cfg.repo_id``. + Pushes to ``cfg.new_repo_id`` when set, otherwise back to ``cfg.repo_id``. """ from huggingface_hub import HfApi # noqa: PLC0415 - repo_id = cfg.dest_repo_id or cfg.repo_id + repo_id = cfg.new_repo_id or cfg.repo_id commit_message = cfg.push_commit_message or "Add steerable annotations (lerobot-annotate)" api = HfApi() print(f"[lerobot-annotate] creating/locating dataset repo {repo_id}...", flush=True) diff --git a/tests/annotations/run_e2e_smoke.py b/tests/annotations/run_e2e_smoke.py index 7974a14bd..723f49a5e 100644 --- a/tests/annotations/run_e2e_smoke.py +++ b/tests/annotations/run_e2e_smoke.py @@ -60,13 +60,11 @@ def _stub_responder(messages): {"text": "place the bottle down", "start": 2.0, "end": 3.0}, ] } - if "concise hierarchical PLAN" in text: - return {"plan": "1. grasp\n2. pour\n3. place"} - if "Update the memory" in text: + if "compressed semantic memory" in text: return {"memory": "poured once"} if "acknowledgement the robot" in text: return {"text": "Sure."} - if "ONE realistic interruption" in text: + if "compact interjection" in text: return {"interjection": "use less water", "speech": "Using less water."} if "frame-grounded visual question" in text: return {"question": "How many cups?", "answer": {"label": "cup", "count": 1}} @@ -94,6 +92,23 @@ def main() -> int: print(f"phases={[(p.name, p.episodes_processed) for p in summary.phases]}") print(f"validation: {summary.validation_report.summary()}") print(f"shards rewritten: {len(summary.written_paths)}") + + # Assert the interjection code path actually fired — otherwise a stale + # canned-VLM marker would silently produce zero interjections and this + # smoke run would still "pass" by only printing. + import pyarrow.parquet as pq # noqa: PLC0415 + + events = [ + r + for shard in summary.written_paths + for ev in pq.read_table(shard).column("language_events").to_pylist() + for r in ev + ] + n_interjections = sum(1 for r in events if r.get("style") == "interjection") + n_speech = sum(1 for r in events if r.get("style") is None and r.get("role") == "assistant") + print(f"interjections={n_interjections} speech_atoms={n_speech}") + assert n_interjections > 0, "no interjection rows produced — check the interjection prompt marker" + assert n_speech > 0, "no speech tool-call atoms produced — check the speech prompt marker" return 0 diff --git a/tests/annotations/test_modules.py b/tests/annotations/test_modules.py index 021cd207f..125c09aa0 100644 --- a/tests/annotations/test_modules.py +++ b/tests/annotations/test_modules.py @@ -151,7 +151,7 @@ def test_module2_mid_episode_emits_paired_interjection_and_speech( { "acknowledgement the robot": {"text": "OK."}, # Marker matches the distinctive line of - # ``module_2_interjection.txt`` ("Write ONE compact + # ``interjections_interjection.txt`` ("Write ONE compact # interjection ..."). Keep this in sync with that prompt's # wording — the canned responder matches on substring. "Write ONE compact interjection": { @@ -245,7 +245,6 @@ def test_module1_attaches_video_block_to_subtask_prompt(fixture_dataset_root: Pa {"text": "wipe the counter", "start": 0.5, "end": 1.1}, ] } - plan_payload = {"plan": "1. grasp\n2. wipe"} memory_payload = {"memory": "wiped once"} def responder(messages): @@ -255,9 +254,7 @@ def test_module1_attaches_video_block_to_subtask_prompt(fixture_dataset_root: Pa for block in m.get("content", []): if isinstance(block, dict) and block.get("type") == "text": text = block.get("text", "") - if "concise hierarchical PLAN" in text: - return plan_payload - if "Update the memory" in text: + if "compressed semantic memory" in text: return memory_payload return payload diff --git a/tests/annotations/test_pipeline_recipe_render.py b/tests/annotations/test_pipeline_recipe_render.py index 43a616934..614c2e45e 100644 --- a/tests/annotations/test_pipeline_recipe_render.py +++ b/tests/annotations/test_pipeline_recipe_render.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""End-to-end smoke: pipeline output → PR 1 canonical recipe rendering.""" +"""End-to-end smoke: pipeline output → canonical recipe rendering.""" from __future__ import annotations @@ -49,14 +49,15 @@ from lerobot.datasets.language_render import render_sample # noqa: E402 from ._helpers import make_canned_responder # noqa: E402 -def _build_pr1_style_blend_recipe() -> TrainingRecipe: +def _build_style_blend_recipe() -> TrainingRecipe: """Inline blend recipe that consumes every style this pipeline produces. - PR 1 used to ship ``src/lerobot/configs/recipes/pi05_hirobot.yaml`` as - a canonical example, but that file was dropped during PR 1 review. The - cross-PR contract this test guards is "the recipe DSL can render - non-empty messages from pipeline output", which doesn't require a - specific YAML — so we build the equivalent blend in code. + The language schema/DSL work used to ship + ``src/lerobot/configs/recipes/pi05_hirobot.yaml`` as a canonical + example, but that file was dropped during review. The contract this + test guards is "the recipe DSL can render non-empty messages from + pipeline output", which doesn't require a specific YAML — so we build + the equivalent blend in code. """ return TrainingRecipe( blend={ @@ -109,10 +110,9 @@ def _build_executor() -> Executor: {"text": "place the bottle down", "start": 1.0, "end": 1.5}, ] }, - "concise hierarchical PLAN": {"plan": "1. grasp\n2. pour\n3. place"}, - "Update the memory": {"memory": "poured once"}, + "compressed semantic memory": {"memory": "poured once"}, "acknowledgement the robot": {"text": "Sure."}, - "ONE realistic interruption": { + "compact interjection": { "interjection": "use less water", "speech": "Using less water.", }, @@ -137,7 +137,7 @@ def _build_executor() -> Executor: ) -def test_pr1_canonical_recipe_renders_nonempty_from_pipeline_output( +def test_canonical_recipe_renders_nonempty_from_pipeline_output( single_episode_root: Path, ) -> None: executor = _build_executor() @@ -150,7 +150,7 @@ def test_pr1_canonical_recipe_renders_nonempty_from_pipeline_output( events_lists = table.column("language_events").to_pylist() timestamps = table.column("timestamp").to_pylist() - recipe = _build_pr1_style_blend_recipe() + recipe = _build_style_blend_recipe() rendered_any = False for ts, persistent, events in zip(timestamps, persistent_lists, events_lists, strict=True): @@ -168,7 +168,7 @@ def test_pr1_canonical_recipe_renders_nonempty_from_pipeline_output( rendered_any = True assert result["target_message_indices"] break - assert rendered_any, "PR 1 recipe rendered no messages from pipeline output" + assert rendered_any, "recipe rendered no messages from pipeline output" # Sanity: speech atom appears in events column intact flat_events = [r for ev in events_lists for r in ev] @@ -177,7 +177,7 @@ def test_pr1_canonical_recipe_renders_nonempty_from_pipeline_output( say = speech_rows[0]["tool_calls"][0] assert say["function"]["name"] == "say" assert isinstance(say["function"]["arguments"]["text"], str) - # PR 2 no longer writes a ``tools`` column — the say schema lives as a - # constant (``SAY_TOOL_SCHEMA``) so PR 1's row struct is the single - # source of truth for the v3.1 schema. + # The pipeline does not write a ``tools`` column — the say schema lives + # as a constant (``SAY_TOOL_SCHEMA``) so the language row struct is the + # single source of truth for the v3.1 schema. assert "tools" not in table.column_names diff --git a/tests/annotations/test_writer.py b/tests/annotations/test_writer.py index 22dfbcb29..0ea550327 100644 --- a/tests/annotations/test_writer.py +++ b/tests/annotations/test_writer.py @@ -229,7 +229,7 @@ def test_writer_drops_subtask_index_idempotent(fixture_dataset_root: Path, tmp_p assert "language_events" in table_a.column_names # The writer no longer emits a dataset-level ``tools`` column; the # ``say`` tool schema lives as a code constant (``SAY_TOOL_SCHEMA``) - # so the parquet stays small and PR 2 doesn't extend PR 1's schema. + # so the parquet stays small and the pipeline doesn't extend the schema. assert "tools" not in table_a.column_names # second pass — must produce identical bytes for the language columns diff --git a/tests/scripts/test_lerobot_annotate.py b/tests/scripts/test_lerobot_annotate.py index 9f80d2e8c..a32ac0660 100644 --- a/tests/scripts/test_lerobot_annotate.py +++ b/tests/scripts/test_lerobot_annotate.py @@ -35,7 +35,7 @@ def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch): cfg = SimpleNamespace( repo_id="source/dataset", - dest_repo_id="annotated/dataset", + new_repo_id="annotated/dataset", push_private=True, push_commit_message=None, )