diff --git a/examples/annotations/run_hf_job.py b/examples/annotations/run_hf_job.py index c8219d9e4..dcc9435ce 100644 --- a/examples/annotations/run_hf_job.py +++ b/examples/annotations/run_hf_job.py @@ -6,15 +6,11 @@ Spawns one ``h200x2`` job that: 1. installs this branch of ``lerobot`` plus the annotation extras, 2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8, 3. runs the plan / interjections / vqa modules across the dataset - in free-form mode (phase 0 canonical-vocabulary discovery is - disabled — each episode generates its own subtasks + memory), + in free-form mode (each episode generates its own subtasks + + memory), 4. uploads the annotated dataset to ``--dest_repo_id`` (when set) or back to ``--repo_id``. -Re-enable phase 0 with ``--vocabulary.enabled=true`` (optionally -``--vocabulary.sample_episodes=N``) when the dataset is homogeneous -enough to share one subtask + memory vocabulary across all episodes. - Usage: HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py @@ -57,14 +53,6 @@ CMD = ( "--executor.episode_parallelism=16 " "--vlm.chat_template_kwargs='{\"enable_thinking\": false}' " "--vlm.camera_key=observation.images.wrist " - # Phase 0 — canonical vocabulary discovery DISABLED by default. - # Heterogeneous datasets (different tasks/scenes across episodes) - # don't share a single small subtask + memory vocabulary, so each - # episode generates its subtasks + memory free-form. Flip to - # ``--vocabulary.enabled=true`` (optionally ``--vocabulary.sample_episodes=N``) - # for homogeneous datasets where a shared canonical vocabulary - # helps the downstream policy. - "--vocabulary.enabled=false " # Phase 1 — plan module (subtasks + plan + memory + task_aug). "--plan.frames_per_second=1.0 " "--plan.use_video_url=true " diff --git a/src/lerobot/annotations/steerable_pipeline/__init__.py b/src/lerobot/annotations/steerable_pipeline/__init__.py index 02d819604..a8da5e05e 100644 --- a/src/lerobot/annotations/steerable_pipeline/__init__.py +++ b/src/lerobot/annotations/steerable_pipeline/__init__.py @@ -26,25 +26,11 @@ outputs are staged per-episode before a final parquet rewrite: from .config import AnnotationPipelineConfig from .validator import StagingValidator, ValidationReport -from .vocabulary import ( - VOCABULARY_FILENAME, - Vocabulary, - VocabularyDiscoveryModule, - load_vocabulary, - save_vocabulary, - vocabulary_path, -) from .writer import LanguageColumnsWriter __all__ = [ - "VOCABULARY_FILENAME", "AnnotationPipelineConfig", "LanguageColumnsWriter", "StagingValidator", "ValidationReport", - "Vocabulary", - "VocabularyDiscoveryModule", - "load_vocabulary", - "save_vocabulary", - "vocabulary_path", ] diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index cc6402f08..c60e58fee 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -21,41 +21,6 @@ from pathlib import Path from typing import Any -@dataclass -class VocabularyConfig: - """Phase 0 — dataset-level canonical vocabulary discovery. - - Watches the first ``sample_episodes`` episode videos and asks the VLM - to derive a small canonical vocabulary (subtask labels + memory - milestones) that every episode in the dataset will reuse. The VLM - decides the count itself from what it sees in the clips — short - pick-and-place demos get ~6 labels, longer multi-step recipes more. - The output lands at ``meta/canonical_vocabulary.json`` and feeds - phase 1's subtask + memory generation as both a prompt-side - constraint and a post-VLM validation gate. - - Why this exists: free-form LLM rephrasing per episode produces near- - unique subtask strings, which makes the downstream low-level policy's - conditioning effectively noise — at inference the policy generates a - *new* paraphrase the action expert has never seen and produces tiny - cautious actions. Forcing every episode onto the same small set of - canonical strings gives the action expert dense supervision per - string and a small target distribution to learn against. - - Set ``enabled=False`` to fall back to free-form generation (original - behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary - file from being clobbered on re-runs. - """ - - enabled: bool = True - sample_episodes: int = 3 - max_video_frames_per_episode: int = 32 - # When True (default), an existing meta/canonical_vocabulary.json is - # loaded as-is and no VLM call is made — lets operators hand-edit the - # file. Set False to always rediscover from the sample episodes. - reuse_existing: bool = True - - @dataclass class PlanConfig: """``plan`` module: plan + subtasks + memory + task augmentation. @@ -351,7 +316,6 @@ class AnnotationPipelineConfig: seed: int = 1729 - vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig) plan: PlanConfig = field(default_factory=PlanConfig) interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig) vqa: VqaConfig = field(default_factory=VqaConfig) diff --git a/src/lerobot/annotations/steerable_pipeline/executor.py b/src/lerobot/annotations/steerable_pipeline/executor.py index 5c725fa65..355e25460 100644 --- a/src/lerobot/annotations/steerable_pipeline/executor.py +++ b/src/lerobot/annotations/steerable_pipeline/executor.py @@ -94,7 +94,6 @@ class Executor: vqa: Any # GeneralVqaModule writer: LanguageColumnsWriter validator: StagingValidator - vocabulary: Any = None # VocabularyDiscoveryModule | None def run(self, root: Path) -> PipelineRunSummary: records = list(iter_episodes(root, only_episodes=self.config.only_episodes)) @@ -109,10 +108,6 @@ class Executor: phases: list[PhaseResult] = [] - # Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary`` - # so subsequent per-episode plan calls see the canonical labels. - phases.append(self._run_vocabulary_phase(records, root)) - # Phase 1: ``plan`` module (plan + subtasks + memory) phases.append(self._run_module_phase("plan", records, staging_dir, self.plan)) # Phase 2: ``interjections`` module (interjections + speech). It @@ -183,62 +178,6 @@ class Executor: flush=True, ) - def _run_vocabulary_phase( - self, records: list[EpisodeRecord], root: Path - ) -> PhaseResult: - """Discover (or load) the canonical vocabulary, wire it into ``self.plan``. - - Returns a ``PhaseResult`` whose ``episodes_processed`` is the number - of sample episodes consulted (0 when disabled or no VLM call was - needed); ``episodes_skipped`` is always ``0`` because vocabulary is - a once-per-dataset artifact, not a per-episode product. - """ - from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415 - - if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False): - print( - "[annotate] phase=vocabulary skipped (module disabled or unset)", - flush=True, - ) - return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) - - existing = load_vocabulary(root) - if existing is not None and self.config.vocabulary.reuse_existing: - print( - f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} " - f"({len(existing.subtasks)} subtask labels, " - f"{len(existing.memory_milestones)} memory milestones)", - flush=True, - ) - self.plan.vocabulary = existing - return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) - - sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records))) - print( - f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...", - flush=True, - ) - t0 = time.time() - vocab = self.vocabulary.discover(records[:sample_n], existing=existing) - if vocab is None: - print( - "[annotate] phase=vocabulary returned no vocabulary — " - "plan module will fall back to free-form generation", - flush=True, - ) - return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0) - - save_path = save_vocabulary(root, vocab) - print( - f"[annotate] phase=vocabulary wrote {save_path} " - f"({len(vocab.subtasks)} subtask labels, " - f"{len(vocab.memory_milestones)} memory milestones) in " - f"{time.time() - t0:.1f}s", - flush=True, - ) - self.plan.vocabulary = vocab - return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0) - def _run_module_phase( self, name: str, diff --git a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py index 46d678fd6..f58ec2c91 100644 --- a/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py +++ b/src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py @@ -37,7 +37,6 @@ from ..prompts import load as load_prompt from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame from ..staging import EpisodeStaging from ..vlm_client import VlmClient -from ..vocabulary import Vocabulary logger = logging.getLogger(__name__) @@ -60,11 +59,6 @@ class PlanSubtasksMemoryModule: vlm: VlmClient config: PlanConfig frame_provider: FrameProvider = field(default_factory=null_provider) - vocabulary: Vocabulary | None = None - """When set, the module constrains subtask + memory generation to the - canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery) - populates this once per dataset; ``None`` falls back to free-form - generation (original behaviour).""" @property def enabled(self) -> bool: @@ -575,28 +569,9 @@ class PlanSubtasksMemoryModule: min_subtask_seconds=self.config.min_subtask_seconds, max_steps=self.config.plan_max_steps, episode_duration=f"{episode_duration:.3f}", - vocabulary_block=self._subtask_vocabulary_block(), ) messages = self._video_message(record, prompt) spans = self._vlm_field(messages, "subtasks") - # When a vocabulary is in force, do a single targeted retry if - # any returned subtask is off-vocab — strict exact-match only, - # no fuzzy snapping. The retry includes the offending strings - # and the full canonical list so the VLM can correct itself. - if self.vocabulary is not None and self.vocabulary.subtasks and spans: - invalid = self._invalid_subtasks(spans) - if invalid: - logger.info( - "episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once", - record.episode_index, - len(invalid), - invalid, - ) - retry_msg = self._build_subtask_retry_message(messages, invalid) - retried = self._vlm_field(retry_msg, "subtasks") - if retried: - spans = retried - if not spans: return [] # clamp to [t0, t_last] and sort @@ -614,21 +589,11 @@ class PlanSubtasksMemoryModule: end = max(t0, min(end, t_last)) if end < start: start, end = end, start - if not text: - continue - text = self._canonicalize_subtask(text) if not text: continue cleaned.append({"text": text, "start": start, "end": end}) cleaned.sort(key=lambda s: s["start"]) cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record) - if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned: - logger.warning( - "episode %d: every VLM subtask was off-vocab even after retry — " - "episode left empty (extend meta/canonical_vocabulary.json to " - "cover the missing phase)", - record.episode_index, - ) return cleaned @staticmethod @@ -679,132 +644,6 @@ class PlanSubtasksMemoryModule: out.append(new_span) return out - # ------------------------------------------------------------------ - # Canonical-vocabulary helpers - # ------------------------------------------------------------------ - - def _subtask_vocabulary_block(self) -> str: - """Bullet-list of canonical subtasks the VLM must pick from. - - Returns an empty string when no vocabulary is configured — - ``module_1_subtasks.txt`` then falls back to its free-form - rules (original behaviour). - """ - if self.vocabulary is None or not self.vocabulary.subtasks: - return "" - bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) - return ( - "You MUST choose each subtask label verbatim from this canonical " - "vocabulary — pick the closest match for each phase of the demo, " - "and reuse the SAME string every time that phase recurs. The " - "low-level policy is conditioned on these exact strings; any " - "novel paraphrase you invent will make its conditioning OOD.\n" - "Canonical subtask labels:\n" - f"{bullets}\n\n" - ) - - def _memory_vocabulary_block(self) -> str: - """Bullet-list of canonical memory milestones the VLM must pick from.""" - if self.vocabulary is None or not self.vocabulary.memory_milestones: - return "" - bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones) - return ( - "Compose the memory by picking ONLY from this canonical milestone " - "list — append a milestone (or rewrite the running memory to " - "compress past ones) using these exact phrases. Do not invent new " - "wording: every paraphrase weakens the downstream conditioning.\n" - "Canonical memory milestones:\n" - f"{bullets}\n\n" - ) - - _NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"}) - - def _canonicalize_subtask(self, text: str) -> str: - """Validate ``text`` against the canonical vocabulary; no fuzzy snap. - - Without a vocabulary, the original text passes through. With a - vocabulary, accept the span only if its normalised form (lower- - cased, articles stripped, whitespace collapsed) matches a - canonical entry exactly — the canonical wording is returned so - the supervised string is byte-identical across episodes. - - Off-vocab spans are dropped (empty string). Upstream - ``_generate_subtasks`` triggers a targeted retry before reaching - the drop path; this function never snaps or warps a span into - a different label. - """ - if self.vocabulary is None or not self.vocabulary.subtasks: - return text.strip() - normalised = self._normalize(text) - if not normalised: - return "" - for candidate in self.vocabulary.subtasks: - if self._normalize(candidate) == normalised: - return candidate - return "" - - @classmethod - def _normalize(cls, text: str) -> str: - """Lowercase, strip articles, collapse whitespace, drop punctuation.""" - words = [ - w.strip(".,:;\"'!?()") - for w in text.lower().replace(",", " ").split() - ] - return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS) - - def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]: - """Return the unique off-vocab subtask strings the VLM produced.""" - seen: list[str] = [] - for span in spans: - text = str((span or {}).get("text") or "").strip() - if not text: - continue - if self._canonicalize_subtask(text): - continue - if text not in seen: - seen.append(text) - return seen - - def _build_subtask_retry_message( - self, original_messages: list[dict[str, Any]], invalid: list[str] - ) -> list[dict[str, Any]]: - """Compose a one-shot correction prompt naming the off-vocab strings.""" - assert self.vocabulary is not None - canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks) - invalid_list = "\n".join(f"- {s!r}" for s in invalid) - correction = ( - "Your previous response included subtask labels that are NOT in " - "the canonical vocabulary:\n" - f"{invalid_list}\n\n" - "Re-emit the same segmentation (same number of spans, same start/end " - "timestamps where they were valid) but replace every off-vocab " - "label with the EXACT canonical string for that phase, copied " - "verbatim from this list:\n" - f"{canonical}\n\n" - "Strict rules:\n" - "- Output strings must be byte-for-byte identical to entries above.\n" - "- No articles, no adverbs, no extra words.\n" - "- If a phase truly has no canonical match, omit that span entirely.\n" - "Return the same JSON shape as before." - ) - # Append the correction as an additional user turn; the model - # sees the original prompt + its prior output is implied by the - # conversation context (the VLM client is stateless, so we - # re-send the original content plus this correction). - retry_messages = [ - { - "role": m.get("role", "user"), - "content": ( - m.get("content") - if isinstance(m.get("content"), str) - else list(m.get("content") or []) - ), - } - for m in original_messages - ] - retry_messages.append({"role": "user", "content": correction}) - return retry_messages - def _generate_plan( self, record: EpisodeRecord, # noqa: ARG002 (kept for signature stability) @@ -866,7 +705,6 @@ class PlanSubtasksMemoryModule: prior_memory=prior_memory or "(none)", completed_subtask=completed, remaining_subtasks=", ".join(remaining) if remaining else "(none)", - vocabulary_block=self._memory_vocabulary_block(), ) memory = self._vlm_field(self._text_message(prompt), "memory") return memory.strip() if isinstance(memory, str) else "" diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt deleted file mode 100644 index 00c29be4e..000000000 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_0_vocabulary.txt +++ /dev/null @@ -1,53 +0,0 @@ -You are inspecting {n_episodes} sample episode video(s) from a teleoperated -robot dataset. Every episode in the dataset performs the SAME task; the -user originally asked: "{episode_task}". - -Watch all the clips and produce a SHORT canonical vocabulary that every -episode in this dataset will reuse. The downstream low-level policy is -conditioned on these strings — duplicate phrasings (e.g. "grasp blue -cube" vs "pick up the blue cube") would destroy the conditioning, so -pick one wording per concept and reuse it everywhere. - -Decide how many entries each list needs YOURSELF based on what you see — -the smallest set that still covers every recurring phase in the demos. -A simple two-object pick-and-place might need ~6 subtask labels and 2 -memory milestones; a long multi-step recipe needs more. Err on the side -of FEWER — extra entries that don't recur across episodes weaken the -conditioning. - -You output two lists: - -1. `subtasks`: imperative, telegraphic commands the robot can execute. - - Verb-first. Drop articles, adverbs, qualifiers. - - Consistent object nouns (if the task says "cube", every subtask says - "cube" — never "block" / "object"). - - Atomic — one skill per subtask (gripper-open events, contact, regrasps, - transitions all become cut points). - - Each label must recur across the demos. If you see a motion only - once across all sample clips, it probably isn't a canonical phase. - - Good: "move to blue cube", "grasp blue cube", "lift blue cube", - "place blue cube in box", "release blue cube", "retract arm". - - Bad: "the robot arm moves towards the blue cube" (third person, - too long), "carefully pick up the cube" (adverb, article), - "carrying the yellow cube over the green basket" (gerund — should - be imperative "transport yellow cube to green basket"). - -2. `memory_milestones`: first-person past-tense sentences the running - memory composes from. Each subtask phase that produces a lasting - change should have a milestone; transient motions (move, retract) - should NOT. - - First person, past tense. Start with "I". - - One sentence. Functional outcome only — no grasp / motion detail. - - Good: "I picked up the blue cube.", "I placed the blue cube in - the green box.", "I wiped the counter." - - Bad: "The robot arm grasped the blue cube." (third person), - "I carefully grasped the blue cube with the parallel gripper." - (irrelevant detail), "I moved towards the blue cube." (transient - motion — should be omitted, not memorialised). - -Output strictly valid JSON of shape: - - {{ - "subtasks": ["", ...], - "memory_milestones": ["I .", ...] - }} diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt index d066b9f73..b5278368b 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_memory.txt @@ -13,7 +13,7 @@ Previous memory: {prior_memory} Just-completed subtask: "{completed_subtask}" Remaining subtasks (for relevance judgement only): {remaining_subtasks} -{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the +Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the robot has accomplished so far — the running story it would tell itself. Authoring rules: diff --git a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt index 9314282be..a49096682 100644 --- a/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt +++ b/src/lerobot/annotations/steerable_pipeline/prompts/module_1_subtasks.txt @@ -6,7 +6,7 @@ You are shown the entire demonstration as a single video. Watch the whole clip, then segment it into a list of consecutive atomic subtasks the robot performs. -{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: +Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts: - Each subtask = one COMPOSITE atomic skill the low-level policy can execute end-to-end. A "skill" bundles its own approach motion with diff --git a/src/lerobot/annotations/steerable_pipeline/vocabulary.py b/src/lerobot/annotations/steerable_pipeline/vocabulary.py deleted file mode 100644 index 121cef849..000000000 --- a/src/lerobot/annotations/steerable_pipeline/vocabulary.py +++ /dev/null @@ -1,222 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Dataset-level canonical vocabulary discovery (Phase 0). - -The downstream consumer of these annotations is a low-level action expert -conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing -gives near-unique strings per occurrence, which collapses the action -expert's conditioning to noise and makes runtime subtask-paraphrase drift -catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical -vocabulary per environment (~10 strings) that every episode reuses; this -module derives that vocabulary automatically from the first few episode -videos and persists it next to the dataset. - -Pipeline-level flow: - - Phase 0 (here): watch N sample episodes → produce vocabulary.json - Phase 1 (plan module): reuse vocabulary on every episode, both as - prompt-side constraint *and* post-VLM validation - -The vocabulary is JSON, lives at ``/meta/canonical_vocabulary.json``, -and is human-inspectable / hand-editable — if the discovered set is wrong, -operators edit the file and re-run the pipeline without phase 0. -""" - -from __future__ import annotations - -import json -import logging -from collections.abc import Sequence -from dataclasses import dataclass, field -from pathlib import Path -from typing import Any - -from .config import VocabularyConfig -from .frames import FrameProvider, null_provider, to_video_block -from .prompts import load as load_prompt -from .reader import EpisodeRecord -from .vlm_client import VlmClient - -logger = logging.getLogger(__name__) - -VOCABULARY_FILENAME = "canonical_vocabulary.json" - - -@dataclass -class Vocabulary: - """Canonical phrasings shared across every episode of one dataset. - - Both lists are strict: per-episode subtask + memory generation pick - from these strings only; the downstream policy then has a small, - repeatable target distribution to learn instead of thousands of - LLM paraphrases. - """ - - subtasks: tuple[str, ...] - """Imperative subtask labels — what the low-level policy is conditioned - on. Verb-first, telegraphic, consistent object nouns. Example: - ``("move to blue cube", "grasp blue cube", "lift blue cube", - "place blue cube in box", "retract arm")``. - """ - - memory_milestones: tuple[str, ...] - """First-person past-tense milestone sentences — building blocks for - the running memory string. Example: ``("I picked up the blue cube.", - "I placed the blue cube in the green box.")``. Each milestone maps - 1:1 onto a completed subtask phase; ``memory_at_step_k`` is the - concatenation of milestones for completed phases. - """ - - def to_json(self) -> dict[str, list[str]]: - return { - "subtasks": list(self.subtasks), - "memory_milestones": list(self.memory_milestones), - } - - @classmethod - def from_json(cls, payload: dict[str, Any]) -> Vocabulary: - subtasks = tuple( - str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip() - ) - memory_milestones = tuple( - str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip() - ) - return cls(subtasks=subtasks, memory_milestones=memory_milestones) - - def is_empty(self) -> bool: - return not self.subtasks and not self.memory_milestones - - -def vocabulary_path(root: Path) -> Path: - """Return the canonical on-disk location for the vocabulary file.""" - return root / "meta" / VOCABULARY_FILENAME - - -def load_vocabulary(root: Path) -> Vocabulary | None: - """Read ``/meta/canonical_vocabulary.json`` if present. - - Returns ``None`` when the file does not exist — callers fall back to - free-form (unconstrained) subtask + memory generation, preserving the - pipeline's behaviour on datasets that never ran phase 0. - """ - path = vocabulary_path(root) - if not path.exists(): - return None - try: - payload = json.loads(path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError) as exc: - logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc) - return None - if not isinstance(payload, dict): - logger.warning("%s is not a JSON object — ignoring", path) - return None - vocab = Vocabulary.from_json(payload) - if vocab.is_empty(): - return None - return vocab - - -def save_vocabulary(root: Path, vocab: Vocabulary) -> Path: - """Atomically persist ``vocab`` to ``/meta/canonical_vocabulary.json``.""" - path = vocabulary_path(root) - path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(path.suffix + ".tmp") - tmp.write_text( - json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n", - encoding="utf-8", - ) - tmp.replace(path) - return path - - -@dataclass -class VocabularyDiscoveryModule: - """Derive a dataset-level canonical vocabulary from sample episodes. - - Phase 0 of the executor: pulls ``config.sample_episodes`` episode - videos, packs them into one Qwen-VL multi-video prompt, and asks the - model to enumerate the small set of canonical subtask labels + - memory milestones that recur across them. The output is persisted - to ``meta/canonical_vocabulary.json`` and consumed by phase 1. - """ - - vlm: VlmClient - config: VocabularyConfig - frame_provider: FrameProvider = field(default_factory=null_provider) - - @property - def enabled(self) -> bool: - return self.config.enabled - - def discover( - self, - records: Sequence[EpisodeRecord], - *, - existing: Vocabulary | None = None, - ) -> Vocabulary | None: - """Run vocabulary discovery against the first N sample episodes. - - ``existing`` short-circuits the VLM call when ``config.reuse_existing`` - is True and an on-disk vocabulary is already present — keeps re-runs - cheap and lets operators hand-edit the file without it getting - overwritten. - """ - if existing is not None and self.config.reuse_existing: - logger.info( - "vocabulary: reusing existing (%d subtasks, %d memory milestones)", - len(existing.subtasks), - len(existing.memory_milestones), - ) - return existing - - sample = list(records[: max(1, int(self.config.sample_episodes))]) - if not sample: - return None - - task_hint = next((r.episode_task for r in sample if r.episode_task), "") - prompt = load_prompt("module_0_vocabulary").format( - episode_task=task_hint or "(unspecified)", - n_episodes=len(sample), - ) - # Pack one video block per sample episode so the VLM sees the - # variation across episodes (different starting poses, different - # object placements) rather than overfitting to one trajectory. - content: list[dict[str, Any]] = [] - for record in sample: - video_frames = self.frame_provider.video_for_episode( - record, int(self.config.max_video_frames_per_episode) - ) - if video_frames: - content.extend(to_video_block(video_frames)) - content.append({"type": "text", "text": prompt}) - messages = [{"role": "user", "content": content}] - - result = self.vlm.generate_json([messages])[0] - if not isinstance(result, dict): - logger.warning("vocabulary: VLM did not return a JSON object — skipping") - return None - - vocab = Vocabulary.from_json(result) - if vocab.is_empty(): - logger.warning("vocabulary: VLM returned an empty vocabulary — skipping") - return None - logger.info( - "vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes", - len(vocab.subtasks), - len(vocab.memory_milestones), - len(sample), - ) - return vocab diff --git a/src/lerobot/scripts/lerobot_annotate.py b/src/lerobot/scripts/lerobot_annotate.py index 52309b827..7fee1f052 100644 --- a/src/lerobot/scripts/lerobot_annotate.py +++ b/src/lerobot/scripts/lerobot_annotate.py @@ -40,7 +40,6 @@ from lerobot.annotations.steerable_pipeline.modules import ( ) from lerobot.annotations.steerable_pipeline.validator import StagingValidator from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client -from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter from lerobot.configs import parser @@ -89,9 +88,6 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider ) vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider) - vocabulary = VocabularyDiscoveryModule( - vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider - ) writer = LanguageColumnsWriter() validator = StagingValidator( dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None, @@ -102,7 +98,6 @@ def annotate(cfg: AnnotationPipelineConfig) -> None: plan=plan, interjections=interjections, vqa=vqa, - vocabulary=vocabulary, writer=writer, validator=validator, ) diff --git a/tests/annotations/test_vocabulary.py b/tests/annotations/test_vocabulary.py deleted file mode 100644 index 7b820834d..000000000 --- a/tests/annotations/test_vocabulary.py +++ /dev/null @@ -1,412 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Vocabulary-discovery phase (phase 0) tests.""" - -from __future__ import annotations - -import json -from pathlib import Path - -from lerobot.annotations.steerable_pipeline.config import ( - PlanConfig, - VocabularyConfig, -) -from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule -from lerobot.annotations.steerable_pipeline.reader import iter_episodes -from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging -from lerobot.annotations.steerable_pipeline.vocabulary import ( - Vocabulary, - VocabularyDiscoveryModule, - load_vocabulary, - save_vocabulary, - vocabulary_path, -) - -from ._helpers import make_canned_responder - - -_CANONICAL_SUBTASKS = ( - "grasp blue cube", - "place blue cube in box", - "retract arm", -) -_CANONICAL_MEMORY = ( - "I picked up the blue cube.", - "I placed the blue cube in the box.", -) - - -# --------------------------------------------------------------------------- -# Vocabulary dataclass + on-disk round-trip -# --------------------------------------------------------------------------- - - -def test_vocabulary_roundtrip(tmp_path: Path) -> None: - vocab = Vocabulary( - subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY - ) - save_path = save_vocabulary(tmp_path, vocab) - assert save_path == vocabulary_path(tmp_path) - assert save_path.exists() - - loaded = load_vocabulary(tmp_path) - assert loaded is not None - assert loaded.subtasks == _CANONICAL_SUBTASKS - assert loaded.memory_milestones == _CANONICAL_MEMORY - - -def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None: - assert load_vocabulary(tmp_path) is None - - -def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None: - path = vocabulary_path(tmp_path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text("{ not valid json", encoding="utf-8") - assert load_vocabulary(tmp_path) is None - - -def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None: - path = vocabulary_path(tmp_path) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8") - assert load_vocabulary(tmp_path) is None - - -# --------------------------------------------------------------------------- -# Discovery module -# --------------------------------------------------------------------------- - - -def test_vocabulary_discovery_calls_vlm_and_returns_vocab( - fixture_dataset_root: Path, -) -> None: - vlm = make_canned_responder( - { - "canonical vocabulary": { - "subtasks": list(_CANONICAL_SUBTASKS), - "memory_milestones": list(_CANONICAL_MEMORY), - } - } - ) - module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2)) - records = list(iter_episodes(fixture_dataset_root)) - vocab = module.discover(records) - assert vocab is not None - assert vocab.subtasks == _CANONICAL_SUBTASKS - assert vocab.memory_milestones == _CANONICAL_MEMORY - - -def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None: - """``reuse_existing=True`` short-circuits the VLM call entirely.""" - - def _explode(_messages): # pragma: no cover - must not be called - raise AssertionError("VLM should not be invoked when reusing existing vocabulary") - - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - vlm = StubVlmClient(responder=_explode) - module = VocabularyDiscoveryModule( - vlm=vlm, config=VocabularyConfig(reuse_existing=True) - ) - records = list(iter_episodes(fixture_dataset_root)) - existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",)) - vocab = module.discover(records, existing=existing) - assert vocab is existing - - -def test_vocabulary_discovery_empty_payload_returns_none( - fixture_dataset_root: Path, -) -> None: - vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}}) - module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig()) - records = list(iter_episodes(fixture_dataset_root)) - assert module.discover(records) is None - - -# --------------------------------------------------------------------------- -# PlanSubtasksMemoryModule consumes the vocabulary -# --------------------------------------------------------------------------- - - -def test_plan_module_inlines_vocab_into_subtask_prompt( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - captured: list[str] = [] - - def responder(messages): - # Find the last user text block and stash it for inspection. - for message in messages: - content = message.get("content") - if isinstance(content, list): - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - captured.append(block.get("text", "")) - # Return canned subtasks; pick the first two canonical strings so - # the validator accepts them. - return { - "subtasks": [ - {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, - {"text": "place blue cube in box", "start": 0.4, "end": 0.9}, - ] - } - - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - # The subtask prompt (and the memory prompt) carries the canonical - # bullet list so the VLM can't paraphrase them away. - assert any("Canonical subtask labels:" in t for t in captured) - assert any("grasp blue cube" in t for t in captured) - - -def test_plan_module_accepts_article_only_difference( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """Articles like 'the'/'a'/'an' are stripped during validation.""" - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - def responder(_messages): - return { - "subtasks": [ - # Same canonical phrase modulo "the" — should be accepted. - {"text": "grasp the blue cube", "start": 0.0, "end": 0.4}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - assert subtask_texts == ["grasp blue cube"] - - -def test_plan_module_retries_when_subtask_off_vocab( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """One-shot retry replaces an off-vocab paraphrase with the canonical form.""" - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - call_count = {"n": 0} - - def responder(messages): - call_count["n"] += 1 - # First call: returns an off-vocab paraphrase. - if call_count["n"] == 1: - return { - "subtasks": [ - # paraphrase, not in vocab - {"text": "pick up blue cube", "start": 0.0, "end": 0.4}, - ] - } - # Second call (the retry): should contain the correction prompt; - # respond with the canonical phrase exactly. - last_user_text = "" - for message in messages: - content = message.get("content") - if isinstance(content, str): - last_user_text = content - elif isinstance(content, list): - for block in content: - if isinstance(block, dict) and block.get("type") == "text": - last_user_text = block.get("text", "") - assert "NOT in the canonical vocabulary" in last_user_text - return { - "subtasks": [ - {"text": "grasp blue cube", "start": 0.0, "end": 0.4}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - assert subtask_texts == ["grasp blue cube"] - # The retry must have fired exactly once. - assert call_count["n"] == 2 - - -def test_plan_module_drops_off_vocab_subtask_after_retry( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """If the VLM stays off-vocab even after the retry, the bad span is dropped.""" - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - call_count = {"n": 0} - - def responder(_messages): - call_count["n"] += 1 - # Both calls return the same off-vocab span — the model can't - # be corrected. The second call also returns one in-vocab span - # so the episode isn't empty; this lets us check that the - # off-vocab span is dropped without affecting the in-vocab one. - if call_count["n"] == 1: - return { - "subtasks": [ - {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, - {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, - ] - } - return { - "subtasks": [ - {"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4}, - {"text": "grasp blue cube", "start": 0.4, "end": 0.9}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - # Retry fired exactly once; bad span dropped, good span kept. - assert call_count["n"] == 2 - assert subtask_texts == ["grasp blue cube"] - - -def test_plan_module_bumps_collocated_subtasks_to_distinct_frames( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """Two subtasks whose starts snap to the same frame get split onto two frames. - - Without this guard, both spans would emit ``style=subtask`` rows at the - identical persistent timestamp; the training-time renderer's - ``active_at(t, style=subtask)`` then raises an ambiguity error. - """ - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - def responder(_messages): - # Two canonical labels with starts within one frame of each other — - # both snap to the same source frame, so the dedupe pass must bump - # the later one to the next frame. - return { - "subtasks": [ - {"text": "grasp blue cube", "start": 0.40, "end": 0.42}, - {"text": "place blue cube in box", "start": 0.41, "end": 0.50}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_rows = [r for r in rows if r["style"] == "subtask"] - # Both subtasks present, both on distinct timestamps. - assert len(subtask_rows) == 2 - timestamps = [r["timestamp"] for r in subtask_rows] - assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}" - # Order preserved: the chronologically earlier span keeps the earlier - # frame, the later one was bumped onto the next available frame. - assert subtask_rows[0]["content"] == "grasp blue cube" - assert subtask_rows[1]["content"] == "place blue cube in box" - assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"] - - -def test_plan_module_empty_when_all_off_vocab_after_retry( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """All-off-vocab spans → episode comes out empty (no silent fuzzy snap).""" - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - def responder(_messages): - # Returns the same off-vocab spans on both attempts. - return { - "subtasks": [ - {"text": "make a smoothie", "start": 0.0, "end": 0.4}, - {"text": "consult the wizard", "start": 0.4, "end": 0.9}, - ] - } - - vlm = StubVlmClient(responder=responder) - vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY) - module = PlanSubtasksMemoryModule( - vlm=vlm, - config=PlanConfig(n_task_rephrasings=0), - vocabulary=vocab, - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - # No subtask gets fabricated — better to leave the episode empty - # so the operator notices the vocabulary gap than to silently - # warp the labels. - assert subtask_texts == [] - - -def test_plan_module_without_vocab_passes_through( - fixture_dataset_root: Path, tmp_path: Path -) -> None: - """No vocabulary configured → original free-form behavior is preserved.""" - from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient - - def responder(_messages): - return { - "subtasks": [ - {"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0}, - ] - } - - vlm = StubVlmClient(responder=responder) - module = PlanSubtasksMemoryModule( - vlm=vlm, config=PlanConfig(n_task_rephrasings=0) - ) - record = next(iter_episodes(fixture_dataset_root)) - staging = EpisodeStaging(tmp_path / "stage", record.episode_index) - module.run_episode(record, staging) - rows = staging.read("plan") - subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"] - assert subtask_texts == ["any free-form text the VLM wants"]