Compare commits

...

11 Commits

Author SHA1 Message Date
pepijn
471b2b1b1d fix(annotate): bump same-frame subtasks onto distinct frames
If two consecutive VLM-emitted subtask spans have ``start`` timestamps
that round to the same source frame after ``snap_to_frame`` (e.g. on
short episodes the VLM sometimes nominates two ~adjacent action
boundaries within one 30 Hz step), the writer emits two
``style=subtask`` rows at the identical persistent timestamp. The
training-time renderer's default binding
``subtask: active_at(t, style=subtask)`` then raises:

    ValueError: Ambiguous resolver for style='subtask';
                add role=..., tool_name=..., or camera=... to disambiguate.

… and the whole training run dies on the first batch.

Observed concretely on ``pepijn223/super_poulain_vocab2`` (job
22159979): episodes 3 and 30 each had two subtask rows at the same
timestamp (``release yellow cube`` + ``retract arm`` snapping to the
same frame).

Add ``_dedupe_starts_to_distinct_frames`` to walk the cleaned span list
and, whenever a snapped start collides with one already used, push the
later span onto the next free frame timestamp. Both subtasks survive
on distinct timestamps; the renderer can now disambiguate. If the
episode genuinely has no later free frame (extremely unlikely — would
require a same-timestamp collision on the very last frame of the
episode), the later span is dropped with a warning rather than left
to poison the render.

New test ``test_plan_module_bumps_collocated_subtasks_to_distinct_frames``
locks in the contract; full vocabulary suite is 14/14 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 19:31:44 +00:00
pepijn
a15e16c072 fix(annotate): replace fuzzy subtask snapping with strict match + one-shot retry
The Jaccard-overlap snap was warping VLM output into wrong canonical
labels — e.g. an off-vocab "consult the wizard" span would silently
become "grasp blue cube" if that scored highest. Even with a higher
floor the operator can't tell which subtasks were paraphrases vs
genuine mislabels in the resulting dataset.

Replace with strict exact-match validation + a single targeted retry:

  1. Generate subtasks as before.
  2. If any returned subtask's normalised form (lowercased, articles
     stripped, whitespace collapsed) isn't in the canonical vocab,
     fire one retry call naming the offending strings and re-sending
     the full canonical list. The retry prompt requires byte-identical
     output from the vocab.
  3. After the retry, validate again. Spans still off-vocab are
     dropped — no fuzzy snapping ever produces a different canonical
     label than the VLM actually emitted.
  4. If every span ends up off-vocab even after the retry, warn loudly
     so the operator extends ``meta/canonical_vocabulary.json`` to
     cover the missing phase. The episode is left with empty subtasks
     rather than silently fabricated ones — visibility > sweep-under-
     the-rug.

Promote ``_NORMALIZE_STRIP_TOKENS`` to a class constant and split the
normalisation helper out so the retry-validation and the final
canonicalisation share one source of truth.

Tests:
  - test_plan_module_accepts_article_only_difference: "grasp the blue
    cube" still maps to canonical "grasp blue cube" (article-tolerant).
  - test_plan_module_retries_when_subtask_off_vocab: paraphrase
    triggers the retry which the VLM corrects in pass 2.
  - test_plan_module_drops_off_vocab_subtask_after_retry: VLM that
    refuses to correct → bad span dropped, in-vocab span kept.
  - test_plan_module_empty_when_all_off_vocab_after_retry: every
    span off-vocab → episode left empty (no warping).
All 13 vocabulary tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-23 09:57:27 +00:00
pepijn
336af85c09 fix(annotate): never leave an episode with zero canonical subtasks
When the canonical vocabulary is enabled and the VLM produces spans
that don't overlap any canonical label, the previous Jaccard-floor
(0.5) dropped them and the episode came out with no subtasks at all
— invisible to the downstream policy. Observed on
``pepijn223/super_poulain_vocab``: some episodes had empty subtask
columns because every VLM-emitted phrase scored below 0.5 against
the discovered vocabulary.

Two-pass canonicalisation:

  - First pass keeps the Jaccard floor (lowered from 0.5 → 0.25, to
    let mild paraphrases through) and drops everything below.
  - If that first pass leaves the episode with **zero** subtasks,
    fall back to a second pass that always snaps each VLM span to
    its nearest canonical label by Jaccard (no floor). The episode
    ends up with subtasks even when the vocabulary missed a phase
    — a slightly-wrong canonical label is still closer to the right
    motion than nothing at all.
  - Log loudly when the fallback fires so the operator can spot
    coverage gaps in ``meta/canonical_vocabulary.json``.
  - Log a per-episode count at INFO when some (but not all) spans
    were dropped so it's visible without spamming the run output.

Promote the Jaccard floor + ignore-tokens to class constants so
they're a single edit point. Add ``force=True`` parameter to
``_canonicalize_subtask`` for the no-floor fallback path.

New test ``test_plan_module_snaps_when_all_off_vocab`` covers the
fallback; existing ``test_plan_module_drops_off_vocab_subtask`` is
adjusted to keep at least one in-vocab span so the floor path can
still fire and is exercised. All 12 vocabulary tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 12:44:03 +00:00
pepijn
54221ceea2 feat(annotate): let the VLM decide vocabulary size
Hardcoding ``n_subtask_target=10`` and ``n_memory_target=6`` baked task
complexity into the config — a simple pick-and-place needs ~6, a
multi-step recipe needs ~20. The VLM already sees the clips, so let it
pick the count itself from what's recurring across episodes.

Drop both knobs from ``VocabularyConfig`` and the ``module_0_vocabulary``
prompt template. The prompt now says "decide the count yourself based
on what you see — the smallest set that still covers every recurring
phase" and adds an "each label must recur across the demos" rule so
the VLM filters out one-off motions.

Update the launcher script + docs to remove the old knobs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 11:46:31 +00:00
pepijn
369ab17110 fix(annotate): update run_hf_job CLI args for renamed namespaces + phase 0
Three stale things in the launcher script:

  - ``--module_1/2/3.*`` no longer exist; review commit fd18beb renamed
    the CLI namespaces to ``--plan/interjections/vqa``. Forwarded all
    eight existing args to their new names.
  - ``--push_to_hub`` is now a bool; the destination repo lives at
    ``--dest_repo_id``. Split the single positional into both args.
  - ``openai`` was missing from the pip install list, which the prior
    review review (claude bot, 2026-05-08) flagged — the default vlm
    backend is ``openai`` so the job would have ImportError'd. Added.

Also expose the new phase 0 (canonical vocabulary discovery) knobs
explicitly: ``--vocabulary.sample_episodes``, ``--n_subtask_target``,
``--n_memory_target``. Defaults are sane (3 / 10 / 6) but worth
flagging in the example so the operator knows what they're running.

Update the docstring + section comments to match the current phase
layout (vocabulary → plan → interjections → vqa → writer).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 11:43:06 +00:00
pepijn
86a7edc590 feat(annotate): phase 0 — derive canonical vocabulary from sample episodes
The pipeline previously emitted near-unique subtask + memory phrasings
per episode (free-form LLM rephrasing). On the downstream low-level
policy that collapses the action expert's conditioning to noise: every
episode pairs a different paraphrase with similar motions, so the
expert learns a flat scene-prior that ignores the subtask string —
then at inference the high-level head invents *yet another* paraphrase
and the expert produces tiny "uncertain hover" chunks.

Add a vocabulary-discovery phase (phase 0) that runs once per dataset:

  - watches the first ``vocabulary.sample_episodes`` (default 3)
    episode videos as one Qwen-VL prompt,
  - asks the VLM to derive ~``n_subtask_target`` canonical imperative
    subtask labels and ~``n_memory_target`` first-person past-tense
    memory milestones that recur across the demos,
  - persists them to ``meta/canonical_vocabulary.json`` (human-
    inspectable, hand-editable), and
  - wires the resulting ``Vocabulary`` into the ``plan`` module so
    every per-episode subtask + memory call is constrained to those
    exact strings (both as prompt-side instructions *and* post-VLM
    validation: paraphrases snap to the closest canonical entry via
    token-set overlap; below a 0.5 Jaccard floor the subtask is
    dropped rather than warped into something semantically wrong).

Operator workflow:

  - first run discovers the vocabulary, writes the JSON, and runs
    the ``plan`` module against it,
  - subsequent runs reuse the on-disk file (``reuse_existing=True``
    default) so hand-edits stick,
  - set ``--vocabulary.enabled=False`` to fall back to free-form
    generation (the original behaviour).

The discovery prompt forbids gerunds / third-person / adverbs and
caps the lists to the requested counts, matching the Hi-Robot /
π0.6-MEM convention of small per-environment vocabularies. The
``plan`` module's subtask + memory prompts grow a conditional
``{vocabulary_block}`` slot rendered only when a vocabulary is
present; without one the templates collapse to their previous
free-form form.

Tests: 11 new unit tests under tests/annotations/test_vocabulary.py
cover the on-disk round-trip, discovery against the fixture dataset,
``reuse_existing`` short-circuit, paraphrase canonicalisation, off-
vocab subtask dropping, and the no-vocabulary pass-through path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-22 11:40:05 +00:00
Pepijn
a0233f53f4 feat(annotate): default VLM to Qwen3.6-35B-A3B-FP8
Match the production target used in examples/annotations/run_hf_job.py.
Per Scale Labs' dense-captioning ablations, model capacity dominates
prompt-engineering gains; defaulting to the larger model avoids
shipping a worst-tier configuration out of the box.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 11:46:59 +02:00
pepijn
2ea0da2d9f fix(annotate): tag uploaded dataset revision
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-19 12:44:35 +00:00
Pepijn
134a707c7a feat(annotate): first-person memory narrative + shorter speech prompts
- module_1_memory: rewrite as an explicit first-person, past-tense
  narrative ("I picked up...", "I opened...") matching the MEM
  (Torne 2026) running-memory style, instead of "one or two short
  sentences" with no person/tense guidance.
- module_1_task_rephrasings: bias rephrasings toward short imperative.
- module_2_initial_speech: prefer very short robot acknowledgements.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 14:17:30 +02:00
Pepijn
ce47075d6b feat(annotate): deterministic plan, single-frame VQA, dataset tagging
Port the steerable-pipeline refinements developed on feat/smolvla-on-
steerable back into the annotation pipeline itself:

- module_1_subtasks: imperative verb-first telegraphic labels with a
  consistent-object-noun rule and good/bad examples (no hard word cap).
- _generate_plan: drop the VLM round-trip; the plan is now a
  deterministic numbered list of still-todo subtasks, re-emitted at
  every subtask boundary so it shrinks as work progresses. Removes
  module_1_plan.txt.
- VqaConfig.K 3 -> 1: a VQA pair anchors exactly its emission frame, no
  stale-label temporal smear.
- lerobot-annotate: tag the pushed dataset with its codebase_version so
  LeRobotDataset can resolve a revision and load it.
- module_2_interjection: shorter, more natural mid-task cues.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 14:06:15 +02:00
Pepijn
26013da699 feat(annotations): enforce imperative verb-first subtask phrasing
Rewrite module_1_subtasks prompt to produce short imperative commands
("pick up the orange") instead of third-person narration ("the robot
arm moves to the orange"). Drops the verbose "how, not what" rule and
adds a good/bad few-shot table.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 13:53:20 +02:00
18 changed files with 1298 additions and 139 deletions

View File

@@ -7,7 +7,8 @@
## What the pipeline produces
Three modules write into a per-episode staging tree, then a single writer
A vocabulary-discovery phase derives a small canonical wording, then three
modules write into a per-episode staging tree, then a single writer
rewrites the data shards in place:
| Style / atom | Column | Module |
@@ -20,6 +21,21 @@ rewrites the data shards in place:
| speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`|
| `vqa` (user / assistant pair) | `language_events` | `vqa` |
The `plan` module is constrained to a **canonical vocabulary** discovered
once per dataset by the `vocabulary` module (phase 0). It watches a few
sample episode videos (`--vocabulary.sample_episodes`, default `3`) and
asks the VLM to derive a small set of imperative subtask labels and
first-person memory milestones that recur across the demos. The VLM
picks the right number of entries itself based on what it sees in the
clips — short pick-and-place demos get ~6 subtask labels, longer
multi-step recipes get more. The result lands at
`meta/canonical_vocabulary.json` (human-readable / hand-editable) and
is reused on every subsequent run. The `plan` module then constrains
both subtask + memory generation to those exact strings — the
downstream low-level policy sees a small, repeatable target
distribution instead of thousands of LLM paraphrases. Disable with
`--vocabulary.enabled=False` to fall back to free-form generation.
The writer does **not** add a `tools` column to the parquet — the tool
catalog lives at `meta/info.json["tools"]` instead (see
[Tools](./tools)). After every annotation run the pipeline ensures the

View File

@@ -1,38 +1,22 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE).
Spawns one ``h200x2`` job that:
1. installs this branch of ``lerobot`` plus the annotation extras,
2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8,
3. runs the plan / interjections / vqa modules across the dataset,
4. uploads the annotated dataset back to ``--repo_id`` (or to
``--dest_repo_id`` when set).
``--repo_id`` is the download source and, with ``--push_to_hub=true``, also
the default upload destination — the job annotates the dataset in place.
Pass ``--dest_repo_id`` to push the result to a separate repo instead and
leave the source untouched.
3. discovers the dataset's canonical subtask + memory vocabulary
from the first 3 sample episodes (phase 0),
4. runs the plan / interjections / vqa modules across the dataset
(subtasks + memory are constrained to the canonical vocabulary),
5. uploads the annotated dataset to ``--dest_repo_id`` (when set)
or back to ``--repo_id``.
Usage:
HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py
Adjust ``CMD`` below to point at your own dataset.
Adjust ``CMD`` below to point at your own dataset / target hub repo.
"""
import os
@@ -48,19 +32,14 @@ CMD = (
"pip install --no-deps "
"'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && "
"pip install --upgrade-strategy only-if-needed "
# Mirror lerobot's [annotations] runtime deps. ``openai`` is required
# because ``VlmConfig.backend`` defaults to ``"openai"`` (which talks
# to a vllm/transformers/ktransformers OpenAI-compatible server).
"datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include "
"toml typing-inspect openai && "
"datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect "
"openai && "
"export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && "
"export VLLM_VIDEO_BACKEND=pyav && "
"lerobot-annotate "
# The dataset to annotate. By default it is also the push destination
# (annotate in place); pass --dest_repo_id to push to a separate repo.
"--repo_id=<your-org>/<your-dataset> "
"--repo_id=imstevenpmwork/super_poulain_draft "
"--dest_repo_id=pepijn223/super_poulain_vocab "
"--push_to_hub=true "
# "--dest_repo_id=<your-org>/<your-annotated-dataset> "
"--vlm.backend=openai "
"--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 "
"--vlm.parallel_servers=2 "
@@ -69,15 +48,29 @@ CMD = (
"--tensor-parallel-size 1 --max-model-len 32768 "
'--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" '
"--vlm.serve_ready_timeout_s=1800 "
"--vlm.client_concurrency=256 "
"--vlm.client_concurrency=128 "
"--vlm.max_new_tokens=512 "
"--executor.episode_parallelism=32 "
"--vlm.chat_template_kwargs='{enable_thinking: false}' "
"--vlm.temperature=0.7 "
"--executor.episode_parallelism=16 "
"--vlm.chat_template_kwargs='{\"enable_thinking\": false}' "
"--vlm.camera_key=observation.images.wrist "
# Phase 0 — canonical vocabulary discovery from the first N sample
# episodes. The VLM picks the right number of subtask + memory
# entries itself from what it sees; the resulting
# meta/canonical_vocabulary.json constrains every subtask + memory
# string to a small repeatable target distribution.
"--vocabulary.sample_episodes=3 "
# Phase 1 — plan module (subtasks + plan + memory + task_aug).
"--plan.frames_per_second=1.0 "
"--plan.use_video_url=true "
"--plan.use_video_url_fps=1.0 "
"--vqa.K=1 --vqa.vqa_emission_hz=0.2"
"--plan.derive_task_from_video=always "
"--plan.n_task_rephrasings=30 "
# Phase 2 — interjections + speech.
"--interjections.max_interjections_per_episode=6 "
# Phase 4 — general VQA.
"--vqa.K=3 "
"--vqa.vqa_emission_hz=1.0"
)
job = run_job(

View File

@@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite:
from .config import AnnotationPipelineConfig
from .validator import StagingValidator, ValidationReport
from .vocabulary import (
VOCABULARY_FILENAME,
Vocabulary,
VocabularyDiscoveryModule,
load_vocabulary,
save_vocabulary,
vocabulary_path,
)
from .writer import LanguageColumnsWriter
__all__ = [
"VOCABULARY_FILENAME",
"AnnotationPipelineConfig",
"LanguageColumnsWriter",
"StagingValidator",
"ValidationReport",
"Vocabulary",
"VocabularyDiscoveryModule",
"load_vocabulary",
"save_vocabulary",
"vocabulary_path",
]

View File

@@ -21,6 +21,41 @@ from pathlib import Path
from typing import Any
@dataclass
class VocabularyConfig:
"""Phase 0 — dataset-level canonical vocabulary discovery.
Watches the first ``sample_episodes`` episode videos and asks the VLM
to derive a small canonical vocabulary (subtask labels + memory
milestones) that every episode in the dataset will reuse. The VLM
decides the count itself from what it sees in the clips — short
pick-and-place demos get ~6 labels, longer multi-step recipes more.
The output lands at ``meta/canonical_vocabulary.json`` and feeds
phase 1's subtask + memory generation as both a prompt-side
constraint and a post-VLM validation gate.
Why this exists: free-form LLM rephrasing per episode produces near-
unique subtask strings, which makes the downstream low-level policy's
conditioning effectively noise — at inference the policy generates a
*new* paraphrase the action expert has never seen and produces tiny
cautious actions. Forcing every episode onto the same small set of
canonical strings gives the action expert dense supervision per
string and a small target distribution to learn against.
Set ``enabled=False`` to fall back to free-form generation (original
behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary
file from being clobbered on re-runs.
"""
enabled: bool = True
sample_episodes: int = 3
max_video_frames_per_episode: int = 32
# When True (default), an existing meta/canonical_vocabulary.json is
# loaded as-is and no VLM call is made — lets operators hand-edit the
# file. Set False to always rediscover from the sample episodes.
reuse_existing: bool = True
@dataclass
class PlanConfig:
"""``plan`` module: plan + subtasks + memory + task augmentation.
@@ -83,7 +118,14 @@ class VqaConfig:
enabled: bool = True
vqa_emission_hz: float = 1.0
K: int = 3
K: int = 1
"""How many *consecutive* frames each emission tick anchors a VQA pair
to. The VLM grounds its answer (bbox / keypoint coordinates, count, …)
against the *first* anchored frame's image, so anchoring K>1 frames
copies that same answer onto later frames where the scene has already
moved — stale labels. Default ``1``: a VQA pair lands on exactly its
emission frame, no temporal smear. Raise it only to trade label
precision for more (noisier) VQA frames."""
question_types: tuple[str, ...] = ("bbox", "keypoint", "count", "attribute", "spatial")
@@ -95,7 +137,7 @@ class VlmConfig:
# ``openai`` talks to a local OpenAI-compatible server; the CLI
# auto-spawns one when ``auto_serve=True``.
backend: str = "openai"
model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct"
model_id: str = "Qwen/Qwen3.6-35B-A3B-FP8"
# OpenAI-compatible server endpoint; ``EMPTY`` works for local servers.
api_base: str = "http://localhost:8000/v1"
@@ -179,6 +221,7 @@ class AnnotationPipelineConfig:
seed: int = 1729
vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig)
plan: PlanConfig = field(default_factory=PlanConfig)
interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig)
vqa: VqaConfig = field(default_factory=VqaConfig)

View File

@@ -15,8 +15,14 @@
# limitations under the License.
"""In-process executor that runs the annotation phases.
The executor plans **six phases** in the dependency order from the plan:
The executor plans **seven phases** in the dependency order from the plan:
phase 0: vocabulary discovery — derive a small canonical vocabulary
from the first few sample-episode videos (subtask labels +
memory milestones) and persist it next to the dataset; the
``plan`` module then constrains every per-episode generation
to those strings, so the downstream policy sees a small,
repeatable conditioning distribution
phase 1: ``plan`` module (plan + subtasks + memory)
phase 2: ``interjections`` module (interjections + speech)
phase 3: ``plan`` plan-update pass — re-runs plan emission at every
@@ -88,6 +94,7 @@ class Executor:
vqa: Any # GeneralVqaModule
writer: LanguageColumnsWriter
validator: StagingValidator
vocabulary: Any = None # VocabularyDiscoveryModule | None
def run(self, root: Path) -> PipelineRunSummary:
records = list(iter_episodes(root, only_episodes=self.config.only_episodes))
@@ -102,6 +109,10 @@ class Executor:
phases: list[PhaseResult] = []
# Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary``
# so subsequent per-episode plan calls see the canonical labels.
phases.append(self._run_vocabulary_phase(records, root))
# Phase 1: ``plan`` module (plan + subtasks + memory)
phases.append(self._run_module_phase("plan", records, staging_dir, self.plan))
# Phase 2: ``interjections`` module (interjections + speech). It
@@ -172,6 +183,62 @@ class Executor:
flush=True,
)
def _run_vocabulary_phase(
self, records: list[EpisodeRecord], root: Path
) -> PhaseResult:
"""Discover (or load) the canonical vocabulary, wire it into ``self.plan``.
Returns a ``PhaseResult`` whose ``episodes_processed`` is the number
of sample episodes consulted (0 when disabled or no VLM call was
needed); ``episodes_skipped`` is always ``0`` because vocabulary is
a once-per-dataset artifact, not a per-episode product.
"""
from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415
if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False):
print(
"[annotate] phase=vocabulary skipped (module disabled or unset)",
flush=True,
)
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
existing = load_vocabulary(root)
if existing is not None and self.config.vocabulary.reuse_existing:
print(
f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} "
f"({len(existing.subtasks)} subtask labels, "
f"{len(existing.memory_milestones)} memory milestones)",
flush=True,
)
self.plan.vocabulary = existing
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records)))
print(
f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...",
flush=True,
)
t0 = time.time()
vocab = self.vocabulary.discover(records[:sample_n], existing=existing)
if vocab is None:
print(
"[annotate] phase=vocabulary returned no vocabulary — "
"plan module will fall back to free-form generation",
flush=True,
)
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
save_path = save_vocabulary(root, vocab)
print(
f"[annotate] phase=vocabulary wrote {save_path} "
f"({len(vocab.subtasks)} subtask labels, "
f"{len(vocab.memory_milestones)} memory milestones) in "
f"{time.time() - t0:.1f}s",
flush=True,
)
self.plan.vocabulary = vocab
return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0)
def _run_module_phase(
self,
name: str,

View File

@@ -17,6 +17,7 @@
from __future__ import annotations
import logging
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
@@ -34,6 +35,9 @@ from ..prompts import load as load_prompt
from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame
from ..staging import EpisodeStaging
from ..vlm_client import VlmClient
from ..vocabulary import Vocabulary
logger = logging.getLogger(__name__)
@dataclass
@@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule:
vlm: VlmClient
config: PlanConfig
frame_provider: FrameProvider = field(default_factory=null_provider)
vocabulary: Vocabulary | None = None
"""When set, the module constrains subtask + memory generation to the
canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery)
populates this once per dataset; ``None`` falls back to free-form
generation (original behaviour)."""
@property
def enabled(self) -> bool:
@@ -104,18 +113,29 @@ class PlanSubtasksMemoryModule:
"tool_calls": None,
}
)
# plan row at t=0
plan_text = self._generate_plan(record, subtask_spans, task=effective_task)
if plan_text is not None:
rows.append(
{
"role": "assistant",
"content": plan_text,
"style": "plan",
"timestamp": float(t0),
"tool_calls": None,
}
# Plan rows at every subtask boundary — including t=0 (start of
# the first subtask). Because the plan is just a numbered list
# of *still-todo* subtasks, re-emitting at each boundary makes
# the active plan shrink as work progresses: at frame t the
# rendered ``${plan}`` is the most recent emission, which
# contains exactly the subtasks that started at or after the
# current span. Saves the runtime from having to derive
# "what's still left" at inference time.
for span in subtask_spans:
boundary_t = snap_to_frame(span["start"], record.frame_timestamps)
plan_text = self._generate_plan(
record, subtask_spans, refresh_t=boundary_t, task=effective_task
)
if plan_text is not None:
rows.append(
{
"role": "assistant",
"content": plan_text,
"style": "plan",
"timestamp": float(boundary_t),
"tool_calls": None,
}
)
# memory rows at every subtask boundary except the very first start
prior_memory = ""
for i, span in enumerate(subtask_spans[1:], start=1):
@@ -300,8 +320,28 @@ class PlanSubtasksMemoryModule:
min_subtask_seconds=self.config.min_subtask_seconds,
max_steps=self.config.plan_max_steps,
episode_duration=f"{episode_duration:.3f}",
vocabulary_block=self._subtask_vocabulary_block(),
)
spans = self._vlm_field(self._video_message(record, prompt), "subtasks")
messages = self._video_message(record, prompt)
spans = self._vlm_field(messages, "subtasks")
# When a vocabulary is in force, do a single targeted retry if
# any returned subtask is off-vocab — strict exact-match only,
# no fuzzy snapping. The retry includes the offending strings
# and the full canonical list so the VLM can correct itself.
if self.vocabulary is not None and self.vocabulary.subtasks and spans:
invalid = self._invalid_subtasks(spans)
if invalid:
logger.info(
"episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once",
record.episode_index,
len(invalid),
invalid,
)
retry_msg = self._build_subtask_retry_message(messages, invalid)
retried = self._vlm_field(retry_msg, "subtasks")
if retried:
spans = retried
if not spans:
return []
# clamp to [t0, t_last] and sort
@@ -319,56 +359,243 @@ class PlanSubtasksMemoryModule:
end = max(t0, min(end, t_last))
if end < start:
start, end = end, start
if not text:
continue
text = self._canonicalize_subtask(text)
if not text:
continue
cleaned.append({"text": text, "start": start, "end": end})
cleaned.sort(key=lambda s: s["start"])
cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record)
if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned:
logger.warning(
"episode %d: every VLM subtask was off-vocab even after retry — "
"episode left empty (extend meta/canonical_vocabulary.json to "
"cover the missing phase)",
record.episode_index,
)
return cleaned
@staticmethod
def _dedupe_starts_to_distinct_frames(
spans: list[dict[str, Any]], record: EpisodeRecord
) -> list[dict[str, Any]]:
"""Bump same-frame subtask starts onto distinct frames.
Two consecutive VLM spans whose ``start`` rounds to the same
source frame (after :func:`snap_to_frame`) would otherwise emit
two ``style=subtask`` rows at the identical persistent
timestamp. The training-time renderer's ``active_at(t,
style=subtask)`` resolver can't disambiguate that and raises
``Ambiguous resolver for style='subtask'``.
Walk the (sorted-by-start) spans, snap each to its frame, and
if the snapped frame is already taken push the span onto the
next unused frame so both subtasks survive on distinct
timestamps. If the episode ends before a free frame is found,
the trailing span is dropped with a warning — better than
poisoning the render.
"""
if not spans:
return spans
frames = record.frame_timestamps
if not frames:
return spans
used: set[float] = set()
out: list[dict[str, Any]] = []
for span in spans:
ts = snap_to_frame(span["start"], frames)
if ts in used:
next_ts = next((f for f in frames if f > ts and f not in used), None)
if next_ts is None:
logger.warning(
"episode %d: subtask %r snapped to occupied frame "
"%.3f and no free later frame exists — dropping",
record.episode_index,
span.get("text"),
ts,
)
continue
ts = next_ts
used.add(ts)
new_span = {**span, "start": ts}
if float(new_span.get("end", ts)) < ts:
new_span["end"] = ts
out.append(new_span)
return out
# ------------------------------------------------------------------
# Canonical-vocabulary helpers
# ------------------------------------------------------------------
def _subtask_vocabulary_block(self) -> str:
"""Bullet-list of canonical subtasks the VLM must pick from.
Returns an empty string when no vocabulary is configured —
``module_1_subtasks.txt`` then falls back to its free-form
rules (original behaviour).
"""
if self.vocabulary is None or not self.vocabulary.subtasks:
return ""
bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
return (
"You MUST choose each subtask label verbatim from this canonical "
"vocabulary — pick the closest match for each phase of the demo, "
"and reuse the SAME string every time that phase recurs. The "
"low-level policy is conditioned on these exact strings; any "
"novel paraphrase you invent will make its conditioning OOD.\n"
"Canonical subtask labels:\n"
f"{bullets}\n\n"
)
def _memory_vocabulary_block(self) -> str:
"""Bullet-list of canonical memory milestones the VLM must pick from."""
if self.vocabulary is None or not self.vocabulary.memory_milestones:
return ""
bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones)
return (
"Compose the memory by picking ONLY from this canonical milestone "
"list — append a milestone (or rewrite the running memory to "
"compress past ones) using these exact phrases. Do not invent new "
"wording: every paraphrase weakens the downstream conditioning.\n"
"Canonical memory milestones:\n"
f"{bullets}\n\n"
)
_NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"})
def _canonicalize_subtask(self, text: str) -> str:
"""Validate ``text`` against the canonical vocabulary; no fuzzy snap.
Without a vocabulary, the original text passes through. With a
vocabulary, accept the span only if its normalised form (lower-
cased, articles stripped, whitespace collapsed) matches a
canonical entry exactly — the canonical wording is returned so
the supervised string is byte-identical across episodes.
Off-vocab spans are dropped (empty string). Upstream
``_generate_subtasks`` triggers a targeted retry before reaching
the drop path; this function never snaps or warps a span into
a different label.
"""
if self.vocabulary is None or not self.vocabulary.subtasks:
return text.strip()
normalised = self._normalize(text)
if not normalised:
return ""
for candidate in self.vocabulary.subtasks:
if self._normalize(candidate) == normalised:
return candidate
return ""
@classmethod
def _normalize(cls, text: str) -> str:
"""Lowercase, strip articles, collapse whitespace, drop punctuation."""
words = [
w.strip(".,:;\"'!?()")
for w in text.lower().replace(",", " ").split()
]
return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS)
def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]:
"""Return the unique off-vocab subtask strings the VLM produced."""
seen: list[str] = []
for span in spans:
text = str((span or {}).get("text") or "").strip()
if not text:
continue
if self._canonicalize_subtask(text):
continue
if text not in seen:
seen.append(text)
return seen
def _build_subtask_retry_message(
self, original_messages: list[dict[str, Any]], invalid: list[str]
) -> list[dict[str, Any]]:
"""Compose a one-shot correction prompt naming the off-vocab strings."""
assert self.vocabulary is not None
canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
invalid_list = "\n".join(f"- {s!r}" for s in invalid)
correction = (
"Your previous response included subtask labels that are NOT in "
"the canonical vocabulary:\n"
f"{invalid_list}\n\n"
"Re-emit the same segmentation (same number of spans, same start/end "
"timestamps where they were valid) but replace every off-vocab "
"label with the EXACT canonical string for that phase, copied "
"verbatim from this list:\n"
f"{canonical}\n\n"
"Strict rules:\n"
"- Output strings must be byte-for-byte identical to entries above.\n"
"- No articles, no adverbs, no extra words.\n"
"- If a phase truly has no canonical match, omit that span entirely.\n"
"Return the same JSON shape as before."
)
# Append the correction as an additional user turn; the model
# sees the original prompt + its prior output is implied by the
# conversation context (the VLM client is stateless, so we
# re-send the original content plus this correction).
retry_messages = [
{
"role": m.get("role", "user"),
"content": (
m.get("content")
if isinstance(m.get("content"), str)
else list(m.get("content") or [])
),
}
for m in original_messages
]
retry_messages.append({"role": "user", "content": correction})
return retry_messages
def _generate_plan(
self,
record: EpisodeRecord,
record: EpisodeRecord, # noqa: ARG002 (kept for signature stability)
subtask_spans: Sequence[dict[str, Any]],
*,
refresh_t: float | None = None,
interjection: str | None = None,
task: str | None = None,
interjection: str | None = None, # noqa: ARG002
task: str | None = None, # noqa: ARG002
) -> str | None:
"""Deterministic plan = numbered list of *still-todo* subtasks.
Previously this called the VLM with a prompt that asked it to
compress the subtasks into a "compact hierarchical plan". That
produced longer-than-necessary plans, cost an extra VLM round-trip
per episode (plus one per interjection on refresh), and could
diverge from the actual subtask sequence the model is going to
execute. Replacing it with a plain summarisation keeps the plan
tightly aligned with the upcoming subtasks and removes the VLM
call entirely.
Layout — short imperative fragments prefixed by "N. ":
1. <subtask 1>
2. <subtask 2>
...
On a refresh at ``refresh_t`` (called from ``run_plan_updates``
on interjection events, and from ``run_episode`` at every subtask
boundary), only subtasks whose start is at or after ``refresh_t``
are included — the plan shrinks as work progresses, so it always
describes what's left.
"""
if not subtask_spans:
return None
subtasks_text = "\n".join(f"- {s['text']}" for s in subtask_spans)
prompt = load_prompt("module_1_plan").format(
episode_task=(task if task is not None else record.episode_task),
subtasks_text=subtasks_text,
plan_max_steps=self.config.plan_max_steps,
remaining = [
s
for s in subtask_spans
if refresh_t is None or float(s.get("start", 0.0)) >= float(refresh_t)
]
if not remaining:
# Past the last subtask boundary on a late refresh — nothing
# left to plan; emit None so the caller skips the row.
return None
return "\n".join(
f"{i}. {span.get('text', '').strip()}" for i, span in enumerate(remaining, start=1)
)
if refresh_t is not None:
# ``current_subtask`` is the span the refresh time falls into,
# so the model knows where in the demonstration the planner is
# standing when it re-emits.
current_subtask = ""
for span in subtask_spans:
if float(span["start"]) <= refresh_t and (
"end" not in span or float(span["end"]) > refresh_t
):
current_subtask = span.get("text", "")
break
if interjection:
prompt += (
f"\n\n(Plan refresh at t={refresh_t:.2f}s after a user "
f"interjection: {interjection!r}. Current subtask just "
f"before the interjection: {current_subtask!r}. Update "
f"the plan so it reflects the interjection — drop or "
f"reorder steps as needed; do not just restate.)\n"
)
else:
# Refresh without an interjection text: still tell the model
# where in the episode the plan stands so the re-emission
# is grounded. Should be rare — plan refreshes are
# interjection-driven by design.
prompt += f"\n\n(Plan refresh at t={refresh_t:.2f}s. Current subtask: {current_subtask!r}.)\n"
plan = self._vlm_field(self._text_message(prompt), "plan")
return plan.strip() if isinstance(plan, str) else None
def _generate_memory(
self,
@@ -384,6 +611,7 @@ class PlanSubtasksMemoryModule:
prior_memory=prior_memory or "(none)",
completed_subtask=completed,
remaining_subtasks=", ".join(remaining) if remaining else "(none)",
vocabulary_block=self._memory_vocabulary_block(),
)
memory = self._vlm_field(self._text_message(prompt), "memory")
return memory.strip() if isinstance(memory, str) else ""

View File

@@ -0,0 +1,53 @@
You are inspecting {n_episodes} sample episode video(s) from a teleoperated
robot dataset. Every episode in the dataset performs the SAME task; the
user originally asked: "{episode_task}".
Watch all the clips and produce a SHORT canonical vocabulary that every
episode in this dataset will reuse. The downstream low-level policy is
conditioned on these strings — duplicate phrasings (e.g. "grasp blue
cube" vs "pick up the blue cube") would destroy the conditioning, so
pick one wording per concept and reuse it everywhere.
Decide how many entries each list needs YOURSELF based on what you see —
the smallest set that still covers every recurring phase in the demos.
A simple two-object pick-and-place might need ~6 subtask labels and 2
memory milestones; a long multi-step recipe needs more. Err on the side
of FEWER — extra entries that don't recur across episodes weaken the
conditioning.
You output two lists:
1. `subtasks`: imperative, telegraphic commands the robot can execute.
- Verb-first. Drop articles, adverbs, qualifiers.
- Consistent object nouns (if the task says "cube", every subtask says
"cube" — never "block" / "object").
- Atomic — one skill per subtask (gripper-open events, contact, regrasps,
transitions all become cut points).
- Each label must recur across the demos. If you see a motion only
once across all sample clips, it probably isn't a canonical phase.
- Good: "move to blue cube", "grasp blue cube", "lift blue cube",
"place blue cube in box", "release blue cube", "retract arm".
- Bad: "the robot arm moves towards the blue cube" (third person,
too long), "carefully pick up the cube" (adverb, article),
"carrying the yellow cube over the green basket" (gerund — should
be imperative "transport yellow cube to green basket").
2. `memory_milestones`: first-person past-tense sentences the running
memory composes from. Each subtask phase that produces a lasting
change should have a milestone; transient motions (move, retract)
should NOT.
- First person, past tense. Start with "I".
- One sentence. Functional outcome only — no grasp / motion detail.
- Good: "I picked up the blue cube.", "I placed the blue cube in
the green box.", "I wiped the counter."
- Bad: "The robot arm grasped the blue cube." (third person),
"I carefully grasped the blue cube with the parallel gripper."
(irrelevant detail), "I moved towards the blue cube." (transient
motion — should be omitted, not memorialised).
Output strictly valid JSON of shape:
{{
"subtasks": ["<verb phrase>", ...],
"memory_milestones": ["I <past-tense sentence>.", ...]
}}

View File

@@ -8,18 +8,29 @@ task execution. Specific object attributes (colors, precise quantities of
each item) get discarded when their details won't affect subsequent
actions. Functional outcomes (where items went, how many) are preserved."
Concrete example from MEM:
Before: "I put a light green bowl, a dark blue bowl and a bright yellow
bowl into the top right cabinet"
After: "I placed three bowls in the top right cabinet"
Episode task: "{episode_task}"
Previous memory: {prior_memory}
Just-completed subtask: "{completed_subtask}"
Remaining subtasks (for relevance judgement only): {remaining_subtasks}
Update the memory. Drop irrelevant detail. Compress completed steps.
Keep WHAT happened, drop HOW. Shorter is better.
{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the
robot has accomplished so far — the running story it would tell itself.
Authoring rules:
- First person, past tense. Every sentence starts with "I": "I picked
up...", "I opened...", "I moved to...".
- One or two short sentences. Extend the previous memory with the
just-completed subtask; do not rewrite it from scratch.
- Keep WHAT happened (functional outcomes — where items went, how many),
drop HOW (grasp details, motions).
- Compress completed steps and drop object attributes (colors, exact
counts) once they no longer affect the remaining subtasks.
Example (MEM, Torne 2026):
Before: "I prepared the pot and got the potatoes, milk, and butter. I
moved to the drawer."
After: "I prepared the pot and got the ingredients. I opened the
drawer with the masher."
Output strictly valid JSON:
{{ "memory": "<one or two short sentences>" }}
{{ "memory": "<one or two short first-person past-tense sentences>" }}

View File

@@ -1,18 +0,0 @@
You are the high-level planner for a robot demonstrating: "{episode_task}".
Given the subtask decomposition below, write a concise hierarchical PLAN
the robot should follow. Format the plan as a numbered list, one line per
high-level step. The plan describes the full task; subtasks are the atomic
skills used to execute it.
Subtasks for context:
{subtasks_text}
Authoring rules:
- 3 to {plan_max_steps} steps.
- Each step describes one logical chunk of the task, not one motion.
- Steps must be in execution order.
- Plain prose, no JSON, no markdown headers.
Output strictly valid JSON:
{{ "plan": "1. ...\n2. ...\n3. ..." }}

View File

@@ -6,15 +6,28 @@ You are shown the entire demonstration as a single video. Watch the
whole clip, then segment it into a list of consecutive atomic subtasks
the robot performs.
Authoring rules — based on Hi Robot (Shi 2025) atom granularity and
Pi0.7 (Physical Intelligence 2025) "how, not what" detail:
{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts:
- Each subtask is one atomic skill the low-level policy can execute,
e.g. "pick up one piece of lettuce", "place the bowl into the box",
"move the right arm to the left".
- Capture HOW the subtask is performed, not only WHAT — e.g. prefer
"grasp the handle of the sponge with the left hand" to "pick up the
sponge".
- Each subtask = one atomic skill the low-level policy can execute.
- Write each subtask as an IMPERATIVE COMMAND, starting with a verb:
move, reach, pick up, grasp, place, put, push, pull, open, close,
turn, press, lift, insert, pour...
- Keep it SHORT — a verb phrase, not a sentence. Drop articles
("the", "a") and adverbs ("carefully", "slowly"). Add a "how"
detail (which hand, which grasp point) ONLY when it is needed to
disambiguate.
- NEVER use third person. Never write "the robot", "the arm", "the
gripper moves", "it picks up" — the robot is implied. Command it,
do not describe it.
- Use the exact object nouns from the task above. If the task says
"cube", every subtask says "cube" — never switch to "block". If it
says "box", never switch to "bin"/"container". Keep vocabulary
consistent across the whole episode.
- Good: "move to blue cube", "grasp blue cube", "lift blue cube",
"place blue cube in box", "open drawer", "release yellow cube".
- Bad: "the robot arm moves towards the blue cube" (third person,
too long), "carefully pick up the cube" (adverb, article),
"release the yellow block" ("block" when the task said "cube").
- Subtasks are non-overlapping and cover the full episode in order.
Choose the cut points yourself based on what you see in the video
(gripper open/close events, contact, regrasps, transitions).
@@ -27,7 +40,7 @@ Output strictly valid JSON of shape:
{{
"subtasks": [
{{"text": "<how-not-what>", "start": <float>, "end": <float>}},
{{"text": "<short imperative verb phrase>", "start": <float>, "end": <float>}},
...
]
}}

View File

@@ -9,7 +9,7 @@ Original task:
Generate exactly {n} alternative phrasings of the same task. Vary:
- formality (casual / polite / curt)
- verbosity (short imperative vs longer polite request)
- verbosity (mostly short imperative; occasional polite request)
- word choice (synonyms, different verbs)
- sentence structure (imperative / question / suggestion)
@@ -17,7 +17,7 @@ Hard rules:
- Each phrasing MUST preserve the exact meaning of the original task.
Do not change which object is involved, the destination, or the
action. Do not add extra steps. Do not invent new objects.
- Each phrasing must be a single short sentence, plain prose, no
- Each phrasing must be a short phrase or sentence, plain prose, no
markdown, no quotes, no list numbers.
- Phrasings must be distinct — no near-duplicates.
- Output exactly {n} entries.

View File

@@ -1,10 +1,12 @@
The user just asked the robot: "{episode_task}".
Generate a short verbal acknowledgement the robot would speak back before
beginning the task. Style: confident, friendly, single short sentence.
beginning the task. Style: compact, confident, friendly.
Examples (Hi Robot, Shi 2025): "Sure, I won't put cheese on it.",
"OK, starting with the sponge.", "Got it.".
Prefer very short replies: "Got it.", "On it.", "OK."
Output strictly valid JSON:
{{ "text": "<the spoken acknowledgement>" }}

View File

@@ -14,12 +14,10 @@ subtask boundary in the demonstration:
- Subtask the robot is about to start: "{next_subtask}"
- Time into episode: {timestamp:.2f}s
Write ONE interjection the user would naturally say at this moment to
prompt / confirm / encourage the robot to do "{next_subtask}". Phrase it
like a real human mid-task remark — conversational, varied, sometimes
just a nudge, sometimes a clarification, sometimes a small constraint
that the upcoming motion happens to satisfy. Plus the robot's verbal
acknowledgement.
Write ONE compact interjection the user would naturally say at this
moment to prompt / confirm / encourage the robot to do "{next_subtask}".
Keep it like a mid-task coaching cue, not a full instruction paragraph.
Also write the robot's compact verbal acknowledgement.
Hard rules:
@@ -29,7 +27,9 @@ Hard rules:
instead", DO NOT — those would contradict the demonstration.
- The interjection must reference an object, location, or action that
is plausible given the visible scene and the next subtask text.
- One sentence each. Conversational, not robotic.
- One short phrase or sentence each. Conversational, not robotic.
- Prefer direct cues: "{next_subtask}, please."; "Now {next_subtask}."
- Keep robot speech very short: "OK.", "On it.", "Doing that."
Style examples (vary the phrasing — don't reuse these verbatim):
- "Now go ahead and {next_subtask}."
@@ -41,6 +41,6 @@ Style examples (vary the phrasing — don't reuse these verbatim):
Output strictly valid JSON:
{{
"interjection": "<single sentence the user says, asking for the next subtask>",
"speech": "<single sentence the robot speaks back, confirming and starting>"
"interjection": "<short cue from the user, asking for the next subtask>",
"speech": "<short robot acknowledgement>"
}}

View File

@@ -0,0 +1,222 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataset-level canonical vocabulary discovery (Phase 0).
The downstream consumer of these annotations is a low-level action expert
conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing
gives near-unique strings per occurrence, which collapses the action
expert's conditioning to noise and makes runtime subtask-paraphrase drift
catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical
vocabulary per environment (~10 strings) that every episode reuses; this
module derives that vocabulary automatically from the first few episode
videos and persists it next to the dataset.
Pipeline-level flow:
Phase 0 (here): watch N sample episodes → produce vocabulary.json
Phase 1 (plan module): reuse vocabulary on every episode, both as
prompt-side constraint *and* post-VLM validation
The vocabulary is JSON, lives at ``<root>/meta/canonical_vocabulary.json``,
and is human-inspectable / hand-editable — if the discovered set is wrong,
operators edit the file and re-run the pipeline without phase 0.
"""
from __future__ import annotations
import json
import logging
from collections.abc import Sequence
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
from .config import VocabularyConfig
from .frames import FrameProvider, null_provider, to_video_block
from .prompts import load as load_prompt
from .reader import EpisodeRecord
from .vlm_client import VlmClient
logger = logging.getLogger(__name__)
VOCABULARY_FILENAME = "canonical_vocabulary.json"
@dataclass
class Vocabulary:
"""Canonical phrasings shared across every episode of one dataset.
Both lists are strict: per-episode subtask + memory generation pick
from these strings only; the downstream policy then has a small,
repeatable target distribution to learn instead of thousands of
LLM paraphrases.
"""
subtasks: tuple[str, ...]
"""Imperative subtask labels — what the low-level policy is conditioned
on. Verb-first, telegraphic, consistent object nouns. Example:
``("move to blue cube", "grasp blue cube", "lift blue cube",
"place blue cube in box", "retract arm")``.
"""
memory_milestones: tuple[str, ...]
"""First-person past-tense milestone sentences — building blocks for
the running memory string. Example: ``("I picked up the blue cube.",
"I placed the blue cube in the green box.")``. Each milestone maps
1:1 onto a completed subtask phase; ``memory_at_step_k`` is the
concatenation of milestones for completed phases.
"""
def to_json(self) -> dict[str, list[str]]:
return {
"subtasks": list(self.subtasks),
"memory_milestones": list(self.memory_milestones),
}
@classmethod
def from_json(cls, payload: dict[str, Any]) -> Vocabulary:
subtasks = tuple(
str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip()
)
memory_milestones = tuple(
str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip()
)
return cls(subtasks=subtasks, memory_milestones=memory_milestones)
def is_empty(self) -> bool:
return not self.subtasks and not self.memory_milestones
def vocabulary_path(root: Path) -> Path:
"""Return the canonical on-disk location for the vocabulary file."""
return root / "meta" / VOCABULARY_FILENAME
def load_vocabulary(root: Path) -> Vocabulary | None:
"""Read ``<root>/meta/canonical_vocabulary.json`` if present.
Returns ``None`` when the file does not exist — callers fall back to
free-form (unconstrained) subtask + memory generation, preserving the
pipeline's behaviour on datasets that never ran phase 0.
"""
path = vocabulary_path(root)
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc)
return None
if not isinstance(payload, dict):
logger.warning("%s is not a JSON object — ignoring", path)
return None
vocab = Vocabulary.from_json(payload)
if vocab.is_empty():
return None
return vocab
def save_vocabulary(root: Path, vocab: Vocabulary) -> Path:
"""Atomically persist ``vocab`` to ``<root>/meta/canonical_vocabulary.json``."""
path = vocabulary_path(root)
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(
json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
tmp.replace(path)
return path
@dataclass
class VocabularyDiscoveryModule:
"""Derive a dataset-level canonical vocabulary from sample episodes.
Phase 0 of the executor: pulls ``config.sample_episodes`` episode
videos, packs them into one Qwen-VL multi-video prompt, and asks the
model to enumerate the small set of canonical subtask labels +
memory milestones that recur across them. The output is persisted
to ``meta/canonical_vocabulary.json`` and consumed by phase 1.
"""
vlm: VlmClient
config: VocabularyConfig
frame_provider: FrameProvider = field(default_factory=null_provider)
@property
def enabled(self) -> bool:
return self.config.enabled
def discover(
self,
records: Sequence[EpisodeRecord],
*,
existing: Vocabulary | None = None,
) -> Vocabulary | None:
"""Run vocabulary discovery against the first N sample episodes.
``existing`` short-circuits the VLM call when ``config.reuse_existing``
is True and an on-disk vocabulary is already present — keeps re-runs
cheap and lets operators hand-edit the file without it getting
overwritten.
"""
if existing is not None and self.config.reuse_existing:
logger.info(
"vocabulary: reusing existing (%d subtasks, %d memory milestones)",
len(existing.subtasks),
len(existing.memory_milestones),
)
return existing
sample = list(records[: max(1, int(self.config.sample_episodes))])
if not sample:
return None
task_hint = next((r.episode_task for r in sample if r.episode_task), "")
prompt = load_prompt("module_0_vocabulary").format(
episode_task=task_hint or "(unspecified)",
n_episodes=len(sample),
)
# Pack one video block per sample episode so the VLM sees the
# variation across episodes (different starting poses, different
# object placements) rather than overfitting to one trajectory.
content: list[dict[str, Any]] = []
for record in sample:
video_frames = self.frame_provider.video_for_episode(
record, int(self.config.max_video_frames_per_episode)
)
if video_frames:
content.extend(to_video_block(video_frames))
content.append({"type": "text", "text": prompt})
messages = [{"role": "user", "content": content}]
result = self.vlm.generate_json([messages])[0]
if not isinstance(result, dict):
logger.warning("vocabulary: VLM did not return a JSON object — skipping")
return None
vocab = Vocabulary.from_json(result)
if vocab.is_empty():
logger.warning("vocabulary: VLM returned an empty vocabulary — skipping")
return None
logger.info(
"vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes",
len(vocab.subtasks),
len(vocab.memory_milestones),
len(sample),
)
return vocab

View File

@@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import (
)
from lerobot.annotations.steerable_pipeline.validator import StagingValidator
from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client
from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule
from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter
from lerobot.configs import parser
@@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider
)
vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider)
vocabulary = VocabularyDiscoveryModule(
vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider
)
writer = LanguageColumnsWriter()
validator = StagingValidator(
dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None,
@@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
plan=plan,
interjections=interjections,
vqa=vqa,
vocabulary=vocabulary,
writer=writer,
validator=validator,
)
@@ -140,7 +145,7 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
exist_ok=True,
)
print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True)
api.upload_folder(
commit_info = api.upload_folder(
folder_path=str(root),
repo_id=repo_id,
repo_type="dataset",
@@ -149,6 +154,48 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
)
print(f"[lerobot-annotate] uploaded to https://huggingface.co/datasets/{repo_id}", flush=True)
# Tag the upload with the codebase version. ``LeRobotDatasetMetadata``
# resolves the dataset revision via ``get_safe_version`` which scans
# for tags like ``v3.0``; without a tag it raises
# ``RevisionNotFoundError``. Read the version straight from the
# dataset's own ``meta/info.json`` so we tag whatever the writer
# actually wrote (no accidental drift if the codebase floor moves).
from lerobot.datasets.dataset_metadata import CODEBASE_VERSION # noqa: PLC0415
info_path = root / "meta" / "info.json"
version_tag = CODEBASE_VERSION
if info_path.exists():
try:
from lerobot.utils.io_utils import load_json # noqa: PLC0415
info = load_json(info_path)
ds_version = info.get("codebase_version")
if isinstance(ds_version, str) and ds_version.startswith("v"):
version_tag = ds_version
except Exception as exc: # noqa: BLE001
print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True)
revision = getattr(commit_info, "oid", None)
tag_kwargs = {
"repo_id": repo_id,
"tag": version_tag,
"repo_type": "dataset",
"exist_ok": True,
}
if revision is not None:
tag_kwargs["revision"] = revision
try:
api.create_tag(**tag_kwargs)
print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True)
except Exception as exc: # noqa: BLE001
print(
f"[lerobot-annotate] WARNING: could not create tag {version_tag!r} on {repo_id}: {exc}. "
"Dataset is uploaded but ``LeRobotDataset`` won't be able to load it until it's tagged. "
"Run: from huggingface_hub import HfApi; "
f"HfApi().create_tag({repo_id!r}, tag={version_tag!r}, repo_type='dataset', exist_ok=True)",
flush=True,
)
def main() -> None:
annotate()

View File

@@ -80,7 +80,6 @@ def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path:
{"text": "place the sponge into the sink", "start": 0.8, "end": 1.1},
]
},
"concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"},
"Update the memory": {"memory": "wiped the counter once"},
},
)
@@ -96,10 +95,16 @@ def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path:
frame_set = set(record.frame_timestamps)
for row in rows:
assert row["timestamp"] in frame_set
# exactly one plan row at t0
plan_rows = [r for r in rows if r["style"] == "plan"]
assert len(plan_rows) == 1
# one plan row per subtask boundary; the first lands at t0 and each
# plan is the deterministic numbered list of still-todo subtasks
plan_rows = sorted((r for r in rows if r["style"] == "plan"), key=lambda r: r["timestamp"])
subtask_rows = [r for r in rows if r["style"] == "subtask"]
assert len(plan_rows) == len(subtask_rows)
assert plan_rows[0]["timestamp"] == record.frame_timestamps[0]
# the t0 plan enumerates all subtasks; later plans shrink
assert plan_rows[0]["content"].startswith("1. ")
assert len(plan_rows[0]["content"].splitlines()) == len(subtask_rows)
assert len(plan_rows[-1]["content"].splitlines()) == 1
def test_module2_at_t0_emits_speech_only_no_interjection(fixture_dataset_root: Path, tmp_path: Path) -> None:

View File

@@ -0,0 +1,412 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Vocabulary-discovery phase (phase 0) tests."""
from __future__ import annotations
import json
from pathlib import Path
from lerobot.annotations.steerable_pipeline.config import (
PlanConfig,
VocabularyConfig,
)
from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule
from lerobot.annotations.steerable_pipeline.reader import iter_episodes
from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging
from lerobot.annotations.steerable_pipeline.vocabulary import (
Vocabulary,
VocabularyDiscoveryModule,
load_vocabulary,
save_vocabulary,
vocabulary_path,
)
from ._helpers import make_canned_responder
_CANONICAL_SUBTASKS = (
"grasp blue cube",
"place blue cube in box",
"retract arm",
)
_CANONICAL_MEMORY = (
"I picked up the blue cube.",
"I placed the blue cube in the box.",
)
# ---------------------------------------------------------------------------
# Vocabulary dataclass + on-disk round-trip
# ---------------------------------------------------------------------------
def test_vocabulary_roundtrip(tmp_path: Path) -> None:
vocab = Vocabulary(
subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY
)
save_path = save_vocabulary(tmp_path, vocab)
assert save_path == vocabulary_path(tmp_path)
assert save_path.exists()
loaded = load_vocabulary(tmp_path)
assert loaded is not None
assert loaded.subtasks == _CANONICAL_SUBTASKS
assert loaded.memory_milestones == _CANONICAL_MEMORY
def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None:
assert load_vocabulary(tmp_path) is None
def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None:
path = vocabulary_path(tmp_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("{ not valid json", encoding="utf-8")
assert load_vocabulary(tmp_path) is None
def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None:
path = vocabulary_path(tmp_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8")
assert load_vocabulary(tmp_path) is None
# ---------------------------------------------------------------------------
# Discovery module
# ---------------------------------------------------------------------------
def test_vocabulary_discovery_calls_vlm_and_returns_vocab(
fixture_dataset_root: Path,
) -> None:
vlm = make_canned_responder(
{
"canonical vocabulary": {
"subtasks": list(_CANONICAL_SUBTASKS),
"memory_milestones": list(_CANONICAL_MEMORY),
}
}
)
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2))
records = list(iter_episodes(fixture_dataset_root))
vocab = module.discover(records)
assert vocab is not None
assert vocab.subtasks == _CANONICAL_SUBTASKS
assert vocab.memory_milestones == _CANONICAL_MEMORY
def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None:
"""``reuse_existing=True`` short-circuits the VLM call entirely."""
def _explode(_messages): # pragma: no cover - must not be called
raise AssertionError("VLM should not be invoked when reusing existing vocabulary")
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
vlm = StubVlmClient(responder=_explode)
module = VocabularyDiscoveryModule(
vlm=vlm, config=VocabularyConfig(reuse_existing=True)
)
records = list(iter_episodes(fixture_dataset_root))
existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",))
vocab = module.discover(records, existing=existing)
assert vocab is existing
def test_vocabulary_discovery_empty_payload_returns_none(
fixture_dataset_root: Path,
) -> None:
vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}})
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig())
records = list(iter_episodes(fixture_dataset_root))
assert module.discover(records) is None
# ---------------------------------------------------------------------------
# PlanSubtasksMemoryModule consumes the vocabulary
# ---------------------------------------------------------------------------
def test_plan_module_inlines_vocab_into_subtask_prompt(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
captured: list[str] = []
def responder(messages):
# Find the last user text block and stash it for inspection.
for message in messages:
content = message.get("content")
if isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
captured.append(block.get("text", ""))
# Return canned subtasks; pick the first two canonical strings so
# the validator accepts them.
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
{"text": "place blue cube in box", "start": 0.4, "end": 0.9},
]
}
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
# The subtask prompt (and the memory prompt) carries the canonical
# bullet list so the VLM can't paraphrase them away.
assert any("Canonical subtask labels:" in t for t in captured)
assert any("grasp blue cube" in t for t in captured)
def test_plan_module_accepts_article_only_difference(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""Articles like 'the'/'a'/'an' are stripped during validation."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
return {
"subtasks": [
# Same canonical phrase modulo "the" — should be accepted.
{"text": "grasp the blue cube", "start": 0.0, "end": 0.4},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["grasp blue cube"]
def test_plan_module_retries_when_subtask_off_vocab(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""One-shot retry replaces an off-vocab paraphrase with the canonical form."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
call_count = {"n": 0}
def responder(messages):
call_count["n"] += 1
# First call: returns an off-vocab paraphrase.
if call_count["n"] == 1:
return {
"subtasks": [
# paraphrase, not in vocab
{"text": "pick up blue cube", "start": 0.0, "end": 0.4},
]
}
# Second call (the retry): should contain the correction prompt;
# respond with the canonical phrase exactly.
last_user_text = ""
for message in messages:
content = message.get("content")
if isinstance(content, str):
last_user_text = content
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
last_user_text = block.get("text", "")
assert "NOT in the canonical vocabulary" in last_user_text
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["grasp blue cube"]
# The retry must have fired exactly once.
assert call_count["n"] == 2
def test_plan_module_drops_off_vocab_subtask_after_retry(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""If the VLM stays off-vocab even after the retry, the bad span is dropped."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
call_count = {"n": 0}
def responder(_messages):
call_count["n"] += 1
# Both calls return the same off-vocab span — the model can't
# be corrected. The second call also returns one in-vocab span
# so the episode isn't empty; this lets us check that the
# off-vocab span is dropped without affecting the in-vocab one.
if call_count["n"] == 1:
return {
"subtasks": [
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
]
}
return {
"subtasks": [
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
# Retry fired exactly once; bad span dropped, good span kept.
assert call_count["n"] == 2
assert subtask_texts == ["grasp blue cube"]
def test_plan_module_bumps_collocated_subtasks_to_distinct_frames(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""Two subtasks whose starts snap to the same frame get split onto two frames.
Without this guard, both spans would emit ``style=subtask`` rows at the
identical persistent timestamp; the training-time renderer's
``active_at(t, style=subtask)`` then raises an ambiguity error.
"""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
# Two canonical labels with starts within one frame of each other —
# both snap to the same source frame, so the dedupe pass must bump
# the later one to the next frame.
return {
"subtasks": [
{"text": "grasp blue cube", "start": 0.40, "end": 0.42},
{"text": "place blue cube in box", "start": 0.41, "end": 0.50},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_rows = [r for r in rows if r["style"] == "subtask"]
# Both subtasks present, both on distinct timestamps.
assert len(subtask_rows) == 2
timestamps = [r["timestamp"] for r in subtask_rows]
assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}"
# Order preserved: the chronologically earlier span keeps the earlier
# frame, the later one was bumped onto the next available frame.
assert subtask_rows[0]["content"] == "grasp blue cube"
assert subtask_rows[1]["content"] == "place blue cube in box"
assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"]
def test_plan_module_empty_when_all_off_vocab_after_retry(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""All-off-vocab spans → episode comes out empty (no silent fuzzy snap)."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
# Returns the same off-vocab spans on both attempts.
return {
"subtasks": [
{"text": "make a smoothie", "start": 0.0, "end": 0.4},
{"text": "consult the wizard", "start": 0.4, "end": 0.9},
]
}
vlm = StubVlmClient(responder=responder)
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
module = PlanSubtasksMemoryModule(
vlm=vlm,
config=PlanConfig(n_task_rephrasings=0),
vocabulary=vocab,
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
# No subtask gets fabricated — better to leave the episode empty
# so the operator notices the vocabulary gap than to silently
# warp the labels.
assert subtask_texts == []
def test_plan_module_without_vocab_passes_through(
fixture_dataset_root: Path, tmp_path: Path
) -> None:
"""No vocabulary configured → original free-form behavior is preserved."""
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
def responder(_messages):
return {
"subtasks": [
{"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0},
]
}
vlm = StubVlmClient(responder=responder)
module = PlanSubtasksMemoryModule(
vlm=vlm, config=PlanConfig(n_task_rephrasings=0)
)
record = next(iter_episodes(fixture_dataset_root))
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
module.run_episode(record, staging)
rows = staging.read("plan")
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
assert subtask_texts == ["any free-form text the VLM wants"]

View File

@@ -0,0 +1,51 @@
#!/usr/bin/env python
import json
from types import SimpleNamespace
def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch):
from lerobot.scripts.lerobot_annotate import _push_to_hub
root = tmp_path / "dataset"
(root / "meta").mkdir(parents=True)
(root / "meta" / "info.json").write_text(json.dumps({"codebase_version": "v3.0"}))
calls = {}
class FakeHfApi:
def create_repo(self, **kwargs):
calls["create_repo"] = kwargs
def upload_folder(self, **kwargs):
calls["upload_folder"] = kwargs
return SimpleNamespace(oid="abc123")
def create_tag(self, **kwargs):
calls["create_tag"] = kwargs
monkeypatch.setattr("huggingface_hub.HfApi", FakeHfApi)
cfg = SimpleNamespace(
repo_id="source/dataset",
dest_repo_id="annotated/dataset",
push_private=True,
push_commit_message=None,
)
_push_to_hub(root, cfg)
assert calls["create_repo"] == {
"repo_id": "annotated/dataset",
"repo_type": "dataset",
"private": True,
"exist_ok": True,
}
assert calls["upload_folder"]["repo_id"] == "annotated/dataset"
assert calls["create_tag"] == {
"repo_id": "annotated/dataset",
"tag": "v3.0",
"repo_type": "dataset",
"exist_ok": True,
"revision": "abc123",
}