mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-30 10:21:24 +00:00
Compare commits
11 Commits
f72b28738a
...
471b2b1b1d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
471b2b1b1d | ||
|
|
a15e16c072 | ||
|
|
336af85c09 | ||
|
|
54221ceea2 | ||
|
|
369ab17110 | ||
|
|
86a7edc590 | ||
|
|
a0233f53f4 | ||
|
|
2ea0da2d9f | ||
|
|
134a707c7a | ||
|
|
ce47075d6b | ||
|
|
26013da699 |
@@ -7,7 +7,8 @@
|
||||
|
||||
## What the pipeline produces
|
||||
|
||||
Three modules write into a per-episode staging tree, then a single writer
|
||||
A vocabulary-discovery phase derives a small canonical wording, then three
|
||||
modules write into a per-episode staging tree, then a single writer
|
||||
rewrites the data shards in place:
|
||||
|
||||
| Style / atom | Column | Module |
|
||||
@@ -20,6 +21,21 @@ rewrites the data shards in place:
|
||||
| speech tool-call atom (`style=null`, `say`) | `language_events` | `interjections`|
|
||||
| `vqa` (user / assistant pair) | `language_events` | `vqa` |
|
||||
|
||||
The `plan` module is constrained to a **canonical vocabulary** discovered
|
||||
once per dataset by the `vocabulary` module (phase 0). It watches a few
|
||||
sample episode videos (`--vocabulary.sample_episodes`, default `3`) and
|
||||
asks the VLM to derive a small set of imperative subtask labels and
|
||||
first-person memory milestones that recur across the demos. The VLM
|
||||
picks the right number of entries itself based on what it sees in the
|
||||
clips — short pick-and-place demos get ~6 subtask labels, longer
|
||||
multi-step recipes get more. The result lands at
|
||||
`meta/canonical_vocabulary.json` (human-readable / hand-editable) and
|
||||
is reused on every subsequent run. The `plan` module then constrains
|
||||
both subtask + memory generation to those exact strings — the
|
||||
downstream low-level policy sees a small, repeatable target
|
||||
distribution instead of thousands of LLM paraphrases. Disable with
|
||||
`--vocabulary.enabled=False` to fall back to free-form generation.
|
||||
|
||||
The writer does **not** add a `tools` column to the parquet — the tool
|
||||
catalog lives at `meta/info.json["tools"]` instead (see
|
||||
[Tools](./tools)). After every annotation run the pipeline ensures the
|
||||
|
||||
@@ -1,38 +1,22 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Launch ``lerobot-annotate`` on a Hugging Face job (vllm + Qwen3.6 MoE).
|
||||
|
||||
Spawns one ``h200x2`` job that:
|
||||
|
||||
1. installs this branch of ``lerobot`` plus the annotation extras,
|
||||
2. boots two vllm servers (one per GPU) with Qwen3.6-35B-A3B-FP8,
|
||||
3. runs the plan / interjections / vqa modules across the dataset,
|
||||
4. uploads the annotated dataset back to ``--repo_id`` (or to
|
||||
``--dest_repo_id`` when set).
|
||||
|
||||
``--repo_id`` is the download source and, with ``--push_to_hub=true``, also
|
||||
the default upload destination — the job annotates the dataset in place.
|
||||
Pass ``--dest_repo_id`` to push the result to a separate repo instead and
|
||||
leave the source untouched.
|
||||
3. discovers the dataset's canonical subtask + memory vocabulary
|
||||
from the first 3 sample episodes (phase 0),
|
||||
4. runs the plan / interjections / vqa modules across the dataset
|
||||
(subtasks + memory are constrained to the canonical vocabulary),
|
||||
5. uploads the annotated dataset to ``--dest_repo_id`` (when set)
|
||||
or back to ``--repo_id``.
|
||||
|
||||
Usage:
|
||||
|
||||
HF_TOKEN=hf_... uv run python examples/annotations/run_hf_job.py
|
||||
|
||||
Adjust ``CMD`` below to point at your own dataset.
|
||||
Adjust ``CMD`` below to point at your own dataset / target hub repo.
|
||||
"""
|
||||
|
||||
import os
|
||||
@@ -48,19 +32,14 @@ CMD = (
|
||||
"pip install --no-deps "
|
||||
"'lerobot @ git+https://github.com/huggingface/lerobot.git@feat/language-annotation-pipeline' && "
|
||||
"pip install --upgrade-strategy only-if-needed "
|
||||
# Mirror lerobot's [annotations] runtime deps. ``openai`` is required
|
||||
# because ``VlmConfig.backend`` defaults to ``"openai"`` (which talks
|
||||
# to a vllm/transformers/ktransformers OpenAI-compatible server).
|
||||
"datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include "
|
||||
"toml typing-inspect openai && "
|
||||
"datasets pyarrow av jsonlines draccus gymnasium torchcodec mergedeep pyyaml-include toml typing-inspect "
|
||||
"openai && "
|
||||
"export VLLM_MEMORY_PROFILER_ESTIMATE_CUDAGRAPHS=0 && "
|
||||
"export VLLM_VIDEO_BACKEND=pyav && "
|
||||
"lerobot-annotate "
|
||||
# The dataset to annotate. By default it is also the push destination
|
||||
# (annotate in place); pass --dest_repo_id to push to a separate repo.
|
||||
"--repo_id=<your-org>/<your-dataset> "
|
||||
"--repo_id=imstevenpmwork/super_poulain_draft "
|
||||
"--dest_repo_id=pepijn223/super_poulain_vocab "
|
||||
"--push_to_hub=true "
|
||||
# "--dest_repo_id=<your-org>/<your-annotated-dataset> "
|
||||
"--vlm.backend=openai "
|
||||
"--vlm.model_id=Qwen/Qwen3.6-35B-A3B-FP8 "
|
||||
"--vlm.parallel_servers=2 "
|
||||
@@ -69,15 +48,29 @@ CMD = (
|
||||
"--tensor-parallel-size 1 --max-model-len 32768 "
|
||||
'--gpu-memory-utilization 0.8 --uvicorn-log-level warning --port {port}" '
|
||||
"--vlm.serve_ready_timeout_s=1800 "
|
||||
"--vlm.client_concurrency=256 "
|
||||
"--vlm.client_concurrency=128 "
|
||||
"--vlm.max_new_tokens=512 "
|
||||
"--executor.episode_parallelism=32 "
|
||||
"--vlm.chat_template_kwargs='{enable_thinking: false}' "
|
||||
"--vlm.temperature=0.7 "
|
||||
"--executor.episode_parallelism=16 "
|
||||
"--vlm.chat_template_kwargs='{\"enable_thinking\": false}' "
|
||||
"--vlm.camera_key=observation.images.wrist "
|
||||
# Phase 0 — canonical vocabulary discovery from the first N sample
|
||||
# episodes. The VLM picks the right number of subtask + memory
|
||||
# entries itself from what it sees; the resulting
|
||||
# meta/canonical_vocabulary.json constrains every subtask + memory
|
||||
# string to a small repeatable target distribution.
|
||||
"--vocabulary.sample_episodes=3 "
|
||||
# Phase 1 — plan module (subtasks + plan + memory + task_aug).
|
||||
"--plan.frames_per_second=1.0 "
|
||||
"--plan.use_video_url=true "
|
||||
"--plan.use_video_url_fps=1.0 "
|
||||
"--vqa.K=1 --vqa.vqa_emission_hz=0.2"
|
||||
"--plan.derive_task_from_video=always "
|
||||
"--plan.n_task_rephrasings=30 "
|
||||
# Phase 2 — interjections + speech.
|
||||
"--interjections.max_interjections_per_episode=6 "
|
||||
# Phase 4 — general VQA.
|
||||
"--vqa.K=3 "
|
||||
"--vqa.vqa_emission_hz=1.0"
|
||||
)
|
||||
|
||||
job = run_job(
|
||||
|
||||
@@ -26,11 +26,25 @@ outputs are staged per-episode before a final parquet rewrite:
|
||||
|
||||
from .config import AnnotationPipelineConfig
|
||||
from .validator import StagingValidator, ValidationReport
|
||||
from .vocabulary import (
|
||||
VOCABULARY_FILENAME,
|
||||
Vocabulary,
|
||||
VocabularyDiscoveryModule,
|
||||
load_vocabulary,
|
||||
save_vocabulary,
|
||||
vocabulary_path,
|
||||
)
|
||||
from .writer import LanguageColumnsWriter
|
||||
|
||||
__all__ = [
|
||||
"VOCABULARY_FILENAME",
|
||||
"AnnotationPipelineConfig",
|
||||
"LanguageColumnsWriter",
|
||||
"StagingValidator",
|
||||
"ValidationReport",
|
||||
"Vocabulary",
|
||||
"VocabularyDiscoveryModule",
|
||||
"load_vocabulary",
|
||||
"save_vocabulary",
|
||||
"vocabulary_path",
|
||||
]
|
||||
|
||||
@@ -21,6 +21,41 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class VocabularyConfig:
|
||||
"""Phase 0 — dataset-level canonical vocabulary discovery.
|
||||
|
||||
Watches the first ``sample_episodes`` episode videos and asks the VLM
|
||||
to derive a small canonical vocabulary (subtask labels + memory
|
||||
milestones) that every episode in the dataset will reuse. The VLM
|
||||
decides the count itself from what it sees in the clips — short
|
||||
pick-and-place demos get ~6 labels, longer multi-step recipes more.
|
||||
The output lands at ``meta/canonical_vocabulary.json`` and feeds
|
||||
phase 1's subtask + memory generation as both a prompt-side
|
||||
constraint and a post-VLM validation gate.
|
||||
|
||||
Why this exists: free-form LLM rephrasing per episode produces near-
|
||||
unique subtask strings, which makes the downstream low-level policy's
|
||||
conditioning effectively noise — at inference the policy generates a
|
||||
*new* paraphrase the action expert has never seen and produces tiny
|
||||
cautious actions. Forcing every episode onto the same small set of
|
||||
canonical strings gives the action expert dense supervision per
|
||||
string and a small target distribution to learn against.
|
||||
|
||||
Set ``enabled=False`` to fall back to free-form generation (original
|
||||
behaviour). ``reuse_existing=True`` keeps a hand-edited vocabulary
|
||||
file from being clobbered on re-runs.
|
||||
"""
|
||||
|
||||
enabled: bool = True
|
||||
sample_episodes: int = 3
|
||||
max_video_frames_per_episode: int = 32
|
||||
# When True (default), an existing meta/canonical_vocabulary.json is
|
||||
# loaded as-is and no VLM call is made — lets operators hand-edit the
|
||||
# file. Set False to always rediscover from the sample episodes.
|
||||
reuse_existing: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlanConfig:
|
||||
"""``plan`` module: plan + subtasks + memory + task augmentation.
|
||||
@@ -83,7 +118,14 @@ class VqaConfig:
|
||||
|
||||
enabled: bool = True
|
||||
vqa_emission_hz: float = 1.0
|
||||
K: int = 3
|
||||
K: int = 1
|
||||
"""How many *consecutive* frames each emission tick anchors a VQA pair
|
||||
to. The VLM grounds its answer (bbox / keypoint coordinates, count, …)
|
||||
against the *first* anchored frame's image, so anchoring K>1 frames
|
||||
copies that same answer onto later frames where the scene has already
|
||||
moved — stale labels. Default ``1``: a VQA pair lands on exactly its
|
||||
emission frame, no temporal smear. Raise it only to trade label
|
||||
precision for more (noisier) VQA frames."""
|
||||
question_types: tuple[str, ...] = ("bbox", "keypoint", "count", "attribute", "spatial")
|
||||
|
||||
|
||||
@@ -95,7 +137,7 @@ class VlmConfig:
|
||||
# ``openai`` talks to a local OpenAI-compatible server; the CLI
|
||||
# auto-spawns one when ``auto_serve=True``.
|
||||
backend: str = "openai"
|
||||
model_id: str = "Qwen/Qwen2.5-VL-7B-Instruct"
|
||||
model_id: str = "Qwen/Qwen3.6-35B-A3B-FP8"
|
||||
|
||||
# OpenAI-compatible server endpoint; ``EMPTY`` works for local servers.
|
||||
api_base: str = "http://localhost:8000/v1"
|
||||
@@ -179,6 +221,7 @@ class AnnotationPipelineConfig:
|
||||
|
||||
seed: int = 1729
|
||||
|
||||
vocabulary: VocabularyConfig = field(default_factory=VocabularyConfig)
|
||||
plan: PlanConfig = field(default_factory=PlanConfig)
|
||||
interjections: InterjectionsConfig = field(default_factory=InterjectionsConfig)
|
||||
vqa: VqaConfig = field(default_factory=VqaConfig)
|
||||
|
||||
@@ -15,8 +15,14 @@
|
||||
# limitations under the License.
|
||||
"""In-process executor that runs the annotation phases.
|
||||
|
||||
The executor plans **six phases** in the dependency order from the plan:
|
||||
The executor plans **seven phases** in the dependency order from the plan:
|
||||
|
||||
phase 0: vocabulary discovery — derive a small canonical vocabulary
|
||||
from the first few sample-episode videos (subtask labels +
|
||||
memory milestones) and persist it next to the dataset; the
|
||||
``plan`` module then constrains every per-episode generation
|
||||
to those strings, so the downstream policy sees a small,
|
||||
repeatable conditioning distribution
|
||||
phase 1: ``plan`` module (plan + subtasks + memory)
|
||||
phase 2: ``interjections`` module (interjections + speech)
|
||||
phase 3: ``plan`` plan-update pass — re-runs plan emission at every
|
||||
@@ -88,6 +94,7 @@ class Executor:
|
||||
vqa: Any # GeneralVqaModule
|
||||
writer: LanguageColumnsWriter
|
||||
validator: StagingValidator
|
||||
vocabulary: Any = None # VocabularyDiscoveryModule | None
|
||||
|
||||
def run(self, root: Path) -> PipelineRunSummary:
|
||||
records = list(iter_episodes(root, only_episodes=self.config.only_episodes))
|
||||
@@ -102,6 +109,10 @@ class Executor:
|
||||
|
||||
phases: list[PhaseResult] = []
|
||||
|
||||
# Phase 0: vocabulary discovery. Mutates ``self.plan.vocabulary``
|
||||
# so subsequent per-episode plan calls see the canonical labels.
|
||||
phases.append(self._run_vocabulary_phase(records, root))
|
||||
|
||||
# Phase 1: ``plan`` module (plan + subtasks + memory)
|
||||
phases.append(self._run_module_phase("plan", records, staging_dir, self.plan))
|
||||
# Phase 2: ``interjections`` module (interjections + speech). It
|
||||
@@ -172,6 +183,62 @@ class Executor:
|
||||
flush=True,
|
||||
)
|
||||
|
||||
def _run_vocabulary_phase(
|
||||
self, records: list[EpisodeRecord], root: Path
|
||||
) -> PhaseResult:
|
||||
"""Discover (or load) the canonical vocabulary, wire it into ``self.plan``.
|
||||
|
||||
Returns a ``PhaseResult`` whose ``episodes_processed`` is the number
|
||||
of sample episodes consulted (0 when disabled or no VLM call was
|
||||
needed); ``episodes_skipped`` is always ``0`` because vocabulary is
|
||||
a once-per-dataset artifact, not a per-episode product.
|
||||
"""
|
||||
from .vocabulary import load_vocabulary, save_vocabulary # noqa: PLC0415
|
||||
|
||||
if self.vocabulary is None or not getattr(self.vocabulary, "enabled", False):
|
||||
print(
|
||||
"[annotate] phase=vocabulary skipped (module disabled or unset)",
|
||||
flush=True,
|
||||
)
|
||||
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
|
||||
|
||||
existing = load_vocabulary(root)
|
||||
if existing is not None and self.config.vocabulary.reuse_existing:
|
||||
print(
|
||||
f"[annotate] phase=vocabulary reusing {root / 'meta' / 'canonical_vocabulary.json'} "
|
||||
f"({len(existing.subtasks)} subtask labels, "
|
||||
f"{len(existing.memory_milestones)} memory milestones)",
|
||||
flush=True,
|
||||
)
|
||||
self.plan.vocabulary = existing
|
||||
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
|
||||
|
||||
sample_n = max(1, min(int(self.config.vocabulary.sample_episodes), len(records)))
|
||||
print(
|
||||
f"[annotate] phase=vocabulary discovering from {sample_n} sample episode(s)...",
|
||||
flush=True,
|
||||
)
|
||||
t0 = time.time()
|
||||
vocab = self.vocabulary.discover(records[:sample_n], existing=existing)
|
||||
if vocab is None:
|
||||
print(
|
||||
"[annotate] phase=vocabulary returned no vocabulary — "
|
||||
"plan module will fall back to free-form generation",
|
||||
flush=True,
|
||||
)
|
||||
return PhaseResult(name="vocabulary", episodes_processed=0, episodes_skipped=0)
|
||||
|
||||
save_path = save_vocabulary(root, vocab)
|
||||
print(
|
||||
f"[annotate] phase=vocabulary wrote {save_path} "
|
||||
f"({len(vocab.subtasks)} subtask labels, "
|
||||
f"{len(vocab.memory_milestones)} memory milestones) in "
|
||||
f"{time.time() - t0:.1f}s",
|
||||
flush=True,
|
||||
)
|
||||
self.plan.vocabulary = vocab
|
||||
return PhaseResult(name="vocabulary", episodes_processed=sample_n, episodes_skipped=0)
|
||||
|
||||
def _run_module_phase(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
@@ -34,6 +35,9 @@ from ..prompts import load as load_prompt
|
||||
from ..reader import EpisodeRecord, reconstruct_subtask_spans, snap_to_frame
|
||||
from ..staging import EpisodeStaging
|
||||
from ..vlm_client import VlmClient
|
||||
from ..vocabulary import Vocabulary
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -54,6 +58,11 @@ class PlanSubtasksMemoryModule:
|
||||
vlm: VlmClient
|
||||
config: PlanConfig
|
||||
frame_provider: FrameProvider = field(default_factory=null_provider)
|
||||
vocabulary: Vocabulary | None = None
|
||||
"""When set, the module constrains subtask + memory generation to the
|
||||
canonical strings in ``vocabulary``. Phase 0 (vocabulary discovery)
|
||||
populates this once per dataset; ``None`` falls back to free-form
|
||||
generation (original behaviour)."""
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
@@ -104,18 +113,29 @@ class PlanSubtasksMemoryModule:
|
||||
"tool_calls": None,
|
||||
}
|
||||
)
|
||||
# plan row at t=0
|
||||
plan_text = self._generate_plan(record, subtask_spans, task=effective_task)
|
||||
if plan_text is not None:
|
||||
rows.append(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": plan_text,
|
||||
"style": "plan",
|
||||
"timestamp": float(t0),
|
||||
"tool_calls": None,
|
||||
}
|
||||
# Plan rows at every subtask boundary — including t=0 (start of
|
||||
# the first subtask). Because the plan is just a numbered list
|
||||
# of *still-todo* subtasks, re-emitting at each boundary makes
|
||||
# the active plan shrink as work progresses: at frame t the
|
||||
# rendered ``${plan}`` is the most recent emission, which
|
||||
# contains exactly the subtasks that started at or after the
|
||||
# current span. Saves the runtime from having to derive
|
||||
# "what's still left" at inference time.
|
||||
for span in subtask_spans:
|
||||
boundary_t = snap_to_frame(span["start"], record.frame_timestamps)
|
||||
plan_text = self._generate_plan(
|
||||
record, subtask_spans, refresh_t=boundary_t, task=effective_task
|
||||
)
|
||||
if plan_text is not None:
|
||||
rows.append(
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": plan_text,
|
||||
"style": "plan",
|
||||
"timestamp": float(boundary_t),
|
||||
"tool_calls": None,
|
||||
}
|
||||
)
|
||||
# memory rows at every subtask boundary except the very first start
|
||||
prior_memory = ""
|
||||
for i, span in enumerate(subtask_spans[1:], start=1):
|
||||
@@ -300,8 +320,28 @@ class PlanSubtasksMemoryModule:
|
||||
min_subtask_seconds=self.config.min_subtask_seconds,
|
||||
max_steps=self.config.plan_max_steps,
|
||||
episode_duration=f"{episode_duration:.3f}",
|
||||
vocabulary_block=self._subtask_vocabulary_block(),
|
||||
)
|
||||
spans = self._vlm_field(self._video_message(record, prompt), "subtasks")
|
||||
messages = self._video_message(record, prompt)
|
||||
spans = self._vlm_field(messages, "subtasks")
|
||||
# When a vocabulary is in force, do a single targeted retry if
|
||||
# any returned subtask is off-vocab — strict exact-match only,
|
||||
# no fuzzy snapping. The retry includes the offending strings
|
||||
# and the full canonical list so the VLM can correct itself.
|
||||
if self.vocabulary is not None and self.vocabulary.subtasks and spans:
|
||||
invalid = self._invalid_subtasks(spans)
|
||||
if invalid:
|
||||
logger.info(
|
||||
"episode %d: VLM emitted %d off-vocab subtask(s) (%s); retrying once",
|
||||
record.episode_index,
|
||||
len(invalid),
|
||||
invalid,
|
||||
)
|
||||
retry_msg = self._build_subtask_retry_message(messages, invalid)
|
||||
retried = self._vlm_field(retry_msg, "subtasks")
|
||||
if retried:
|
||||
spans = retried
|
||||
|
||||
if not spans:
|
||||
return []
|
||||
# clamp to [t0, t_last] and sort
|
||||
@@ -319,56 +359,243 @@ class PlanSubtasksMemoryModule:
|
||||
end = max(t0, min(end, t_last))
|
||||
if end < start:
|
||||
start, end = end, start
|
||||
if not text:
|
||||
continue
|
||||
text = self._canonicalize_subtask(text)
|
||||
if not text:
|
||||
continue
|
||||
cleaned.append({"text": text, "start": start, "end": end})
|
||||
cleaned.sort(key=lambda s: s["start"])
|
||||
cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record)
|
||||
if self.vocabulary is not None and self.vocabulary.subtasks and not cleaned:
|
||||
logger.warning(
|
||||
"episode %d: every VLM subtask was off-vocab even after retry — "
|
||||
"episode left empty (extend meta/canonical_vocabulary.json to "
|
||||
"cover the missing phase)",
|
||||
record.episode_index,
|
||||
)
|
||||
return cleaned
|
||||
|
||||
@staticmethod
|
||||
def _dedupe_starts_to_distinct_frames(
|
||||
spans: list[dict[str, Any]], record: EpisodeRecord
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Bump same-frame subtask starts onto distinct frames.
|
||||
|
||||
Two consecutive VLM spans whose ``start`` rounds to the same
|
||||
source frame (after :func:`snap_to_frame`) would otherwise emit
|
||||
two ``style=subtask`` rows at the identical persistent
|
||||
timestamp. The training-time renderer's ``active_at(t,
|
||||
style=subtask)`` resolver can't disambiguate that and raises
|
||||
``Ambiguous resolver for style='subtask'``.
|
||||
|
||||
Walk the (sorted-by-start) spans, snap each to its frame, and
|
||||
if the snapped frame is already taken push the span onto the
|
||||
next unused frame so both subtasks survive on distinct
|
||||
timestamps. If the episode ends before a free frame is found,
|
||||
the trailing span is dropped with a warning — better than
|
||||
poisoning the render.
|
||||
"""
|
||||
if not spans:
|
||||
return spans
|
||||
frames = record.frame_timestamps
|
||||
if not frames:
|
||||
return spans
|
||||
used: set[float] = set()
|
||||
out: list[dict[str, Any]] = []
|
||||
for span in spans:
|
||||
ts = snap_to_frame(span["start"], frames)
|
||||
if ts in used:
|
||||
next_ts = next((f for f in frames if f > ts and f not in used), None)
|
||||
if next_ts is None:
|
||||
logger.warning(
|
||||
"episode %d: subtask %r snapped to occupied frame "
|
||||
"%.3f and no free later frame exists — dropping",
|
||||
record.episode_index,
|
||||
span.get("text"),
|
||||
ts,
|
||||
)
|
||||
continue
|
||||
ts = next_ts
|
||||
used.add(ts)
|
||||
new_span = {**span, "start": ts}
|
||||
if float(new_span.get("end", ts)) < ts:
|
||||
new_span["end"] = ts
|
||||
out.append(new_span)
|
||||
return out
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Canonical-vocabulary helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _subtask_vocabulary_block(self) -> str:
|
||||
"""Bullet-list of canonical subtasks the VLM must pick from.
|
||||
|
||||
Returns an empty string when no vocabulary is configured —
|
||||
``module_1_subtasks.txt`` then falls back to its free-form
|
||||
rules (original behaviour).
|
||||
"""
|
||||
if self.vocabulary is None or not self.vocabulary.subtasks:
|
||||
return ""
|
||||
bullets = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
|
||||
return (
|
||||
"You MUST choose each subtask label verbatim from this canonical "
|
||||
"vocabulary — pick the closest match for each phase of the demo, "
|
||||
"and reuse the SAME string every time that phase recurs. The "
|
||||
"low-level policy is conditioned on these exact strings; any "
|
||||
"novel paraphrase you invent will make its conditioning OOD.\n"
|
||||
"Canonical subtask labels:\n"
|
||||
f"{bullets}\n\n"
|
||||
)
|
||||
|
||||
def _memory_vocabulary_block(self) -> str:
|
||||
"""Bullet-list of canonical memory milestones the VLM must pick from."""
|
||||
if self.vocabulary is None or not self.vocabulary.memory_milestones:
|
||||
return ""
|
||||
bullets = "\n".join(f"- {m}" for m in self.vocabulary.memory_milestones)
|
||||
return (
|
||||
"Compose the memory by picking ONLY from this canonical milestone "
|
||||
"list — append a milestone (or rewrite the running memory to "
|
||||
"compress past ones) using these exact phrases. Do not invent new "
|
||||
"wording: every paraphrase weakens the downstream conditioning.\n"
|
||||
"Canonical memory milestones:\n"
|
||||
f"{bullets}\n\n"
|
||||
)
|
||||
|
||||
_NORMALIZE_STRIP_TOKENS: frozenset[str] = frozenset({"the", "a", "an"})
|
||||
|
||||
def _canonicalize_subtask(self, text: str) -> str:
|
||||
"""Validate ``text`` against the canonical vocabulary; no fuzzy snap.
|
||||
|
||||
Without a vocabulary, the original text passes through. With a
|
||||
vocabulary, accept the span only if its normalised form (lower-
|
||||
cased, articles stripped, whitespace collapsed) matches a
|
||||
canonical entry exactly — the canonical wording is returned so
|
||||
the supervised string is byte-identical across episodes.
|
||||
|
||||
Off-vocab spans are dropped (empty string). Upstream
|
||||
``_generate_subtasks`` triggers a targeted retry before reaching
|
||||
the drop path; this function never snaps or warps a span into
|
||||
a different label.
|
||||
"""
|
||||
if self.vocabulary is None or not self.vocabulary.subtasks:
|
||||
return text.strip()
|
||||
normalised = self._normalize(text)
|
||||
if not normalised:
|
||||
return ""
|
||||
for candidate in self.vocabulary.subtasks:
|
||||
if self._normalize(candidate) == normalised:
|
||||
return candidate
|
||||
return ""
|
||||
|
||||
@classmethod
|
||||
def _normalize(cls, text: str) -> str:
|
||||
"""Lowercase, strip articles, collapse whitespace, drop punctuation."""
|
||||
words = [
|
||||
w.strip(".,:;\"'!?()")
|
||||
for w in text.lower().replace(",", " ").split()
|
||||
]
|
||||
return " ".join(w for w in words if w and w not in cls._NORMALIZE_STRIP_TOKENS)
|
||||
|
||||
def _invalid_subtasks(self, spans: list[dict[str, Any]]) -> list[str]:
|
||||
"""Return the unique off-vocab subtask strings the VLM produced."""
|
||||
seen: list[str] = []
|
||||
for span in spans:
|
||||
text = str((span or {}).get("text") or "").strip()
|
||||
if not text:
|
||||
continue
|
||||
if self._canonicalize_subtask(text):
|
||||
continue
|
||||
if text not in seen:
|
||||
seen.append(text)
|
||||
return seen
|
||||
|
||||
def _build_subtask_retry_message(
|
||||
self, original_messages: list[dict[str, Any]], invalid: list[str]
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Compose a one-shot correction prompt naming the off-vocab strings."""
|
||||
assert self.vocabulary is not None
|
||||
canonical = "\n".join(f"- {s}" for s in self.vocabulary.subtasks)
|
||||
invalid_list = "\n".join(f"- {s!r}" for s in invalid)
|
||||
correction = (
|
||||
"Your previous response included subtask labels that are NOT in "
|
||||
"the canonical vocabulary:\n"
|
||||
f"{invalid_list}\n\n"
|
||||
"Re-emit the same segmentation (same number of spans, same start/end "
|
||||
"timestamps where they were valid) but replace every off-vocab "
|
||||
"label with the EXACT canonical string for that phase, copied "
|
||||
"verbatim from this list:\n"
|
||||
f"{canonical}\n\n"
|
||||
"Strict rules:\n"
|
||||
"- Output strings must be byte-for-byte identical to entries above.\n"
|
||||
"- No articles, no adverbs, no extra words.\n"
|
||||
"- If a phase truly has no canonical match, omit that span entirely.\n"
|
||||
"Return the same JSON shape as before."
|
||||
)
|
||||
# Append the correction as an additional user turn; the model
|
||||
# sees the original prompt + its prior output is implied by the
|
||||
# conversation context (the VLM client is stateless, so we
|
||||
# re-send the original content plus this correction).
|
||||
retry_messages = [
|
||||
{
|
||||
"role": m.get("role", "user"),
|
||||
"content": (
|
||||
m.get("content")
|
||||
if isinstance(m.get("content"), str)
|
||||
else list(m.get("content") or [])
|
||||
),
|
||||
}
|
||||
for m in original_messages
|
||||
]
|
||||
retry_messages.append({"role": "user", "content": correction})
|
||||
return retry_messages
|
||||
|
||||
def _generate_plan(
|
||||
self,
|
||||
record: EpisodeRecord,
|
||||
record: EpisodeRecord, # noqa: ARG002 (kept for signature stability)
|
||||
subtask_spans: Sequence[dict[str, Any]],
|
||||
*,
|
||||
refresh_t: float | None = None,
|
||||
interjection: str | None = None,
|
||||
task: str | None = None,
|
||||
interjection: str | None = None, # noqa: ARG002
|
||||
task: str | None = None, # noqa: ARG002
|
||||
) -> str | None:
|
||||
"""Deterministic plan = numbered list of *still-todo* subtasks.
|
||||
|
||||
Previously this called the VLM with a prompt that asked it to
|
||||
compress the subtasks into a "compact hierarchical plan". That
|
||||
produced longer-than-necessary plans, cost an extra VLM round-trip
|
||||
per episode (plus one per interjection on refresh), and could
|
||||
diverge from the actual subtask sequence the model is going to
|
||||
execute. Replacing it with a plain summarisation keeps the plan
|
||||
tightly aligned with the upcoming subtasks and removes the VLM
|
||||
call entirely.
|
||||
|
||||
Layout — short imperative fragments prefixed by "N. ":
|
||||
|
||||
1. <subtask 1>
|
||||
2. <subtask 2>
|
||||
...
|
||||
|
||||
On a refresh at ``refresh_t`` (called from ``run_plan_updates``
|
||||
on interjection events, and from ``run_episode`` at every subtask
|
||||
boundary), only subtasks whose start is at or after ``refresh_t``
|
||||
are included — the plan shrinks as work progresses, so it always
|
||||
describes what's left.
|
||||
"""
|
||||
if not subtask_spans:
|
||||
return None
|
||||
subtasks_text = "\n".join(f"- {s['text']}" for s in subtask_spans)
|
||||
prompt = load_prompt("module_1_plan").format(
|
||||
episode_task=(task if task is not None else record.episode_task),
|
||||
subtasks_text=subtasks_text,
|
||||
plan_max_steps=self.config.plan_max_steps,
|
||||
remaining = [
|
||||
s
|
||||
for s in subtask_spans
|
||||
if refresh_t is None or float(s.get("start", 0.0)) >= float(refresh_t)
|
||||
]
|
||||
if not remaining:
|
||||
# Past the last subtask boundary on a late refresh — nothing
|
||||
# left to plan; emit None so the caller skips the row.
|
||||
return None
|
||||
return "\n".join(
|
||||
f"{i}. {span.get('text', '').strip()}" for i, span in enumerate(remaining, start=1)
|
||||
)
|
||||
if refresh_t is not None:
|
||||
# ``current_subtask`` is the span the refresh time falls into,
|
||||
# so the model knows where in the demonstration the planner is
|
||||
# standing when it re-emits.
|
||||
current_subtask = ""
|
||||
for span in subtask_spans:
|
||||
if float(span["start"]) <= refresh_t and (
|
||||
"end" not in span or float(span["end"]) > refresh_t
|
||||
):
|
||||
current_subtask = span.get("text", "")
|
||||
break
|
||||
if interjection:
|
||||
prompt += (
|
||||
f"\n\n(Plan refresh at t={refresh_t:.2f}s after a user "
|
||||
f"interjection: {interjection!r}. Current subtask just "
|
||||
f"before the interjection: {current_subtask!r}. Update "
|
||||
f"the plan so it reflects the interjection — drop or "
|
||||
f"reorder steps as needed; do not just restate.)\n"
|
||||
)
|
||||
else:
|
||||
# Refresh without an interjection text: still tell the model
|
||||
# where in the episode the plan stands so the re-emission
|
||||
# is grounded. Should be rare — plan refreshes are
|
||||
# interjection-driven by design.
|
||||
prompt += f"\n\n(Plan refresh at t={refresh_t:.2f}s. Current subtask: {current_subtask!r}.)\n"
|
||||
plan = self._vlm_field(self._text_message(prompt), "plan")
|
||||
return plan.strip() if isinstance(plan, str) else None
|
||||
|
||||
def _generate_memory(
|
||||
self,
|
||||
@@ -384,6 +611,7 @@ class PlanSubtasksMemoryModule:
|
||||
prior_memory=prior_memory or "(none)",
|
||||
completed_subtask=completed,
|
||||
remaining_subtasks=", ".join(remaining) if remaining else "(none)",
|
||||
vocabulary_block=self._memory_vocabulary_block(),
|
||||
)
|
||||
memory = self._vlm_field(self._text_message(prompt), "memory")
|
||||
return memory.strip() if isinstance(memory, str) else ""
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
You are inspecting {n_episodes} sample episode video(s) from a teleoperated
|
||||
robot dataset. Every episode in the dataset performs the SAME task; the
|
||||
user originally asked: "{episode_task}".
|
||||
|
||||
Watch all the clips and produce a SHORT canonical vocabulary that every
|
||||
episode in this dataset will reuse. The downstream low-level policy is
|
||||
conditioned on these strings — duplicate phrasings (e.g. "grasp blue
|
||||
cube" vs "pick up the blue cube") would destroy the conditioning, so
|
||||
pick one wording per concept and reuse it everywhere.
|
||||
|
||||
Decide how many entries each list needs YOURSELF based on what you see —
|
||||
the smallest set that still covers every recurring phase in the demos.
|
||||
A simple two-object pick-and-place might need ~6 subtask labels and 2
|
||||
memory milestones; a long multi-step recipe needs more. Err on the side
|
||||
of FEWER — extra entries that don't recur across episodes weaken the
|
||||
conditioning.
|
||||
|
||||
You output two lists:
|
||||
|
||||
1. `subtasks`: imperative, telegraphic commands the robot can execute.
|
||||
- Verb-first. Drop articles, adverbs, qualifiers.
|
||||
- Consistent object nouns (if the task says "cube", every subtask says
|
||||
"cube" — never "block" / "object").
|
||||
- Atomic — one skill per subtask (gripper-open events, contact, regrasps,
|
||||
transitions all become cut points).
|
||||
- Each label must recur across the demos. If you see a motion only
|
||||
once across all sample clips, it probably isn't a canonical phase.
|
||||
- Good: "move to blue cube", "grasp blue cube", "lift blue cube",
|
||||
"place blue cube in box", "release blue cube", "retract arm".
|
||||
- Bad: "the robot arm moves towards the blue cube" (third person,
|
||||
too long), "carefully pick up the cube" (adverb, article),
|
||||
"carrying the yellow cube over the green basket" (gerund — should
|
||||
be imperative "transport yellow cube to green basket").
|
||||
|
||||
2. `memory_milestones`: first-person past-tense sentences the running
|
||||
memory composes from. Each subtask phase that produces a lasting
|
||||
change should have a milestone; transient motions (move, retract)
|
||||
should NOT.
|
||||
- First person, past tense. Start with "I".
|
||||
- One sentence. Functional outcome only — no grasp / motion detail.
|
||||
- Good: "I picked up the blue cube.", "I placed the blue cube in
|
||||
the green box.", "I wiped the counter."
|
||||
- Bad: "The robot arm grasped the blue cube." (third person),
|
||||
"I carefully grasped the blue cube with the parallel gripper."
|
||||
(irrelevant detail), "I moved towards the blue cube." (transient
|
||||
motion — should be omitted, not memorialised).
|
||||
|
||||
Output strictly valid JSON of shape:
|
||||
|
||||
{{
|
||||
"subtasks": ["<verb phrase>", ...],
|
||||
"memory_milestones": ["I <past-tense sentence>.", ...]
|
||||
}}
|
||||
@@ -8,18 +8,29 @@ task execution. Specific object attributes (colors, precise quantities of
|
||||
each item) get discarded when their details won't affect subsequent
|
||||
actions. Functional outcomes (where items went, how many) are preserved."
|
||||
|
||||
Concrete example from MEM:
|
||||
Before: "I put a light green bowl, a dark blue bowl and a bright yellow
|
||||
bowl into the top right cabinet"
|
||||
After: "I placed three bowls in the top right cabinet"
|
||||
|
||||
Episode task: "{episode_task}"
|
||||
Previous memory: {prior_memory}
|
||||
Just-completed subtask: "{completed_subtask}"
|
||||
Remaining subtasks (for relevance judgement only): {remaining_subtasks}
|
||||
|
||||
Update the memory. Drop irrelevant detail. Compress completed steps.
|
||||
Keep WHAT happened, drop HOW. Shorter is better.
|
||||
{vocabulary_block}Write the memory as a short FIRST-PERSON, PAST-TENSE narrative of what the
|
||||
robot has accomplished so far — the running story it would tell itself.
|
||||
|
||||
Authoring rules:
|
||||
- First person, past tense. Every sentence starts with "I": "I picked
|
||||
up...", "I opened...", "I moved to...".
|
||||
- One or two short sentences. Extend the previous memory with the
|
||||
just-completed subtask; do not rewrite it from scratch.
|
||||
- Keep WHAT happened (functional outcomes — where items went, how many),
|
||||
drop HOW (grasp details, motions).
|
||||
- Compress completed steps and drop object attributes (colors, exact
|
||||
counts) once they no longer affect the remaining subtasks.
|
||||
|
||||
Example (MEM, Torne 2026):
|
||||
Before: "I prepared the pot and got the potatoes, milk, and butter. I
|
||||
moved to the drawer."
|
||||
After: "I prepared the pot and got the ingredients. I opened the
|
||||
drawer with the masher."
|
||||
|
||||
Output strictly valid JSON:
|
||||
{{ "memory": "<one or two short sentences>" }}
|
||||
{{ "memory": "<one or two short first-person past-tense sentences>" }}
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
You are the high-level planner for a robot demonstrating: "{episode_task}".
|
||||
|
||||
Given the subtask decomposition below, write a concise hierarchical PLAN
|
||||
the robot should follow. Format the plan as a numbered list, one line per
|
||||
high-level step. The plan describes the full task; subtasks are the atomic
|
||||
skills used to execute it.
|
||||
|
||||
Subtasks for context:
|
||||
{subtasks_text}
|
||||
|
||||
Authoring rules:
|
||||
- 3 to {plan_max_steps} steps.
|
||||
- Each step describes one logical chunk of the task, not one motion.
|
||||
- Steps must be in execution order.
|
||||
- Plain prose, no JSON, no markdown headers.
|
||||
|
||||
Output strictly valid JSON:
|
||||
{{ "plan": "1. ...\n2. ...\n3. ..." }}
|
||||
@@ -6,15 +6,28 @@ You are shown the entire demonstration as a single video. Watch the
|
||||
whole clip, then segment it into a list of consecutive atomic subtasks
|
||||
the robot performs.
|
||||
|
||||
Authoring rules — based on Hi Robot (Shi 2025) atom granularity and
|
||||
Pi0.7 (Physical Intelligence 2025) "how, not what" detail:
|
||||
{vocabulary_block}Authoring rules — Hi Robot atom granularity, pi0.7-style short prompts:
|
||||
|
||||
- Each subtask is one atomic skill the low-level policy can execute,
|
||||
e.g. "pick up one piece of lettuce", "place the bowl into the box",
|
||||
"move the right arm to the left".
|
||||
- Capture HOW the subtask is performed, not only WHAT — e.g. prefer
|
||||
"grasp the handle of the sponge with the left hand" to "pick up the
|
||||
sponge".
|
||||
- Each subtask = one atomic skill the low-level policy can execute.
|
||||
- Write each subtask as an IMPERATIVE COMMAND, starting with a verb:
|
||||
move, reach, pick up, grasp, place, put, push, pull, open, close,
|
||||
turn, press, lift, insert, pour...
|
||||
- Keep it SHORT — a verb phrase, not a sentence. Drop articles
|
||||
("the", "a") and adverbs ("carefully", "slowly"). Add a "how"
|
||||
detail (which hand, which grasp point) ONLY when it is needed to
|
||||
disambiguate.
|
||||
- NEVER use third person. Never write "the robot", "the arm", "the
|
||||
gripper moves", "it picks up" — the robot is implied. Command it,
|
||||
do not describe it.
|
||||
- Use the exact object nouns from the task above. If the task says
|
||||
"cube", every subtask says "cube" — never switch to "block". If it
|
||||
says "box", never switch to "bin"/"container". Keep vocabulary
|
||||
consistent across the whole episode.
|
||||
- Good: "move to blue cube", "grasp blue cube", "lift blue cube",
|
||||
"place blue cube in box", "open drawer", "release yellow cube".
|
||||
- Bad: "the robot arm moves towards the blue cube" (third person,
|
||||
too long), "carefully pick up the cube" (adverb, article),
|
||||
"release the yellow block" ("block" when the task said "cube").
|
||||
- Subtasks are non-overlapping and cover the full episode in order.
|
||||
Choose the cut points yourself based on what you see in the video
|
||||
(gripper open/close events, contact, regrasps, transitions).
|
||||
@@ -27,7 +40,7 @@ Output strictly valid JSON of shape:
|
||||
|
||||
{{
|
||||
"subtasks": [
|
||||
{{"text": "<how-not-what>", "start": <float>, "end": <float>}},
|
||||
{{"text": "<short imperative verb phrase>", "start": <float>, "end": <float>}},
|
||||
...
|
||||
]
|
||||
}}
|
||||
|
||||
@@ -9,7 +9,7 @@ Original task:
|
||||
Generate exactly {n} alternative phrasings of the same task. Vary:
|
||||
|
||||
- formality (casual / polite / curt)
|
||||
- verbosity (short imperative vs longer polite request)
|
||||
- verbosity (mostly short imperative; occasional polite request)
|
||||
- word choice (synonyms, different verbs)
|
||||
- sentence structure (imperative / question / suggestion)
|
||||
|
||||
@@ -17,7 +17,7 @@ Hard rules:
|
||||
- Each phrasing MUST preserve the exact meaning of the original task.
|
||||
Do not change which object is involved, the destination, or the
|
||||
action. Do not add extra steps. Do not invent new objects.
|
||||
- Each phrasing must be a single short sentence, plain prose, no
|
||||
- Each phrasing must be a short phrase or sentence, plain prose, no
|
||||
markdown, no quotes, no list numbers.
|
||||
- Phrasings must be distinct — no near-duplicates.
|
||||
- Output exactly {n} entries.
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
The user just asked the robot: "{episode_task}".
|
||||
|
||||
Generate a short verbal acknowledgement the robot would speak back before
|
||||
beginning the task. Style: confident, friendly, single short sentence.
|
||||
beginning the task. Style: compact, confident, friendly.
|
||||
|
||||
Examples (Hi Robot, Shi 2025): "Sure, I won't put cheese on it.",
|
||||
"OK, starting with the sponge.", "Got it.".
|
||||
|
||||
Prefer very short replies: "Got it.", "On it.", "OK."
|
||||
|
||||
Output strictly valid JSON:
|
||||
{{ "text": "<the spoken acknowledgement>" }}
|
||||
|
||||
@@ -14,12 +14,10 @@ subtask boundary in the demonstration:
|
||||
- Subtask the robot is about to start: "{next_subtask}"
|
||||
- Time into episode: {timestamp:.2f}s
|
||||
|
||||
Write ONE interjection the user would naturally say at this moment to
|
||||
prompt / confirm / encourage the robot to do "{next_subtask}". Phrase it
|
||||
like a real human mid-task remark — conversational, varied, sometimes
|
||||
just a nudge, sometimes a clarification, sometimes a small constraint
|
||||
that the upcoming motion happens to satisfy. Plus the robot's verbal
|
||||
acknowledgement.
|
||||
Write ONE compact interjection the user would naturally say at this
|
||||
moment to prompt / confirm / encourage the robot to do "{next_subtask}".
|
||||
Keep it like a mid-task coaching cue, not a full instruction paragraph.
|
||||
Also write the robot's compact verbal acknowledgement.
|
||||
|
||||
Hard rules:
|
||||
|
||||
@@ -29,7 +27,9 @@ Hard rules:
|
||||
instead", DO NOT — those would contradict the demonstration.
|
||||
- The interjection must reference an object, location, or action that
|
||||
is plausible given the visible scene and the next subtask text.
|
||||
- One sentence each. Conversational, not robotic.
|
||||
- One short phrase or sentence each. Conversational, not robotic.
|
||||
- Prefer direct cues: "{next_subtask}, please."; "Now {next_subtask}."
|
||||
- Keep robot speech very short: "OK.", "On it.", "Doing that."
|
||||
|
||||
Style examples (vary the phrasing — don't reuse these verbatim):
|
||||
- "Now go ahead and {next_subtask}."
|
||||
@@ -41,6 +41,6 @@ Style examples (vary the phrasing — don't reuse these verbatim):
|
||||
|
||||
Output strictly valid JSON:
|
||||
{{
|
||||
"interjection": "<single sentence the user says, asking for the next subtask>",
|
||||
"speech": "<single sentence the robot speaks back, confirming and starting>"
|
||||
"interjection": "<short cue from the user, asking for the next subtask>",
|
||||
"speech": "<short robot acknowledgement>"
|
||||
}}
|
||||
|
||||
222
src/lerobot/annotations/steerable_pipeline/vocabulary.py
Normal file
222
src/lerobot/annotations/steerable_pipeline/vocabulary.py
Normal file
@@ -0,0 +1,222 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Dataset-level canonical vocabulary discovery (Phase 0).
|
||||
|
||||
The downstream consumer of these annotations is a low-level action expert
|
||||
conditioned on the ``subtask`` string. Free-form per-episode LLM rephrasing
|
||||
gives near-unique strings per occurrence, which collapses the action
|
||||
expert's conditioning to noise and makes runtime subtask-paraphrase drift
|
||||
catastrophic. The Hi-Robot / π0.6-MEM recipe ships a small canonical
|
||||
vocabulary per environment (~10 strings) that every episode reuses; this
|
||||
module derives that vocabulary automatically from the first few episode
|
||||
videos and persists it next to the dataset.
|
||||
|
||||
Pipeline-level flow:
|
||||
|
||||
Phase 0 (here): watch N sample episodes → produce vocabulary.json
|
||||
Phase 1 (plan module): reuse vocabulary on every episode, both as
|
||||
prompt-side constraint *and* post-VLM validation
|
||||
|
||||
The vocabulary is JSON, lives at ``<root>/meta/canonical_vocabulary.json``,
|
||||
and is human-inspectable / hand-editable — if the discovered set is wrong,
|
||||
operators edit the file and re-run the pipeline without phase 0.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from .config import VocabularyConfig
|
||||
from .frames import FrameProvider, null_provider, to_video_block
|
||||
from .prompts import load as load_prompt
|
||||
from .reader import EpisodeRecord
|
||||
from .vlm_client import VlmClient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
VOCABULARY_FILENAME = "canonical_vocabulary.json"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Vocabulary:
|
||||
"""Canonical phrasings shared across every episode of one dataset.
|
||||
|
||||
Both lists are strict: per-episode subtask + memory generation pick
|
||||
from these strings only; the downstream policy then has a small,
|
||||
repeatable target distribution to learn instead of thousands of
|
||||
LLM paraphrases.
|
||||
"""
|
||||
|
||||
subtasks: tuple[str, ...]
|
||||
"""Imperative subtask labels — what the low-level policy is conditioned
|
||||
on. Verb-first, telegraphic, consistent object nouns. Example:
|
||||
``("move to blue cube", "grasp blue cube", "lift blue cube",
|
||||
"place blue cube in box", "retract arm")``.
|
||||
"""
|
||||
|
||||
memory_milestones: tuple[str, ...]
|
||||
"""First-person past-tense milestone sentences — building blocks for
|
||||
the running memory string. Example: ``("I picked up the blue cube.",
|
||||
"I placed the blue cube in the green box.")``. Each milestone maps
|
||||
1:1 onto a completed subtask phase; ``memory_at_step_k`` is the
|
||||
concatenation of milestones for completed phases.
|
||||
"""
|
||||
|
||||
def to_json(self) -> dict[str, list[str]]:
|
||||
return {
|
||||
"subtasks": list(self.subtasks),
|
||||
"memory_milestones": list(self.memory_milestones),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, payload: dict[str, Any]) -> Vocabulary:
|
||||
subtasks = tuple(
|
||||
str(s).strip() for s in (payload.get("subtasks") or []) if str(s).strip()
|
||||
)
|
||||
memory_milestones = tuple(
|
||||
str(s).strip() for s in (payload.get("memory_milestones") or []) if str(s).strip()
|
||||
)
|
||||
return cls(subtasks=subtasks, memory_milestones=memory_milestones)
|
||||
|
||||
def is_empty(self) -> bool:
|
||||
return not self.subtasks and not self.memory_milestones
|
||||
|
||||
|
||||
def vocabulary_path(root: Path) -> Path:
|
||||
"""Return the canonical on-disk location for the vocabulary file."""
|
||||
return root / "meta" / VOCABULARY_FILENAME
|
||||
|
||||
|
||||
def load_vocabulary(root: Path) -> Vocabulary | None:
|
||||
"""Read ``<root>/meta/canonical_vocabulary.json`` if present.
|
||||
|
||||
Returns ``None`` when the file does not exist — callers fall back to
|
||||
free-form (unconstrained) subtask + memory generation, preserving the
|
||||
pipeline's behaviour on datasets that never ran phase 0.
|
||||
"""
|
||||
path = vocabulary_path(root)
|
||||
if not path.exists():
|
||||
return None
|
||||
try:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
logger.warning("could not read %s: %s — proceeding without vocabulary", path, exc)
|
||||
return None
|
||||
if not isinstance(payload, dict):
|
||||
logger.warning("%s is not a JSON object — ignoring", path)
|
||||
return None
|
||||
vocab = Vocabulary.from_json(payload)
|
||||
if vocab.is_empty():
|
||||
return None
|
||||
return vocab
|
||||
|
||||
|
||||
def save_vocabulary(root: Path, vocab: Vocabulary) -> Path:
|
||||
"""Atomically persist ``vocab`` to ``<root>/meta/canonical_vocabulary.json``."""
|
||||
path = vocabulary_path(root)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
tmp.write_text(
|
||||
json.dumps(vocab.to_json(), indent=2, ensure_ascii=False) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
tmp.replace(path)
|
||||
return path
|
||||
|
||||
|
||||
@dataclass
|
||||
class VocabularyDiscoveryModule:
|
||||
"""Derive a dataset-level canonical vocabulary from sample episodes.
|
||||
|
||||
Phase 0 of the executor: pulls ``config.sample_episodes`` episode
|
||||
videos, packs them into one Qwen-VL multi-video prompt, and asks the
|
||||
model to enumerate the small set of canonical subtask labels +
|
||||
memory milestones that recur across them. The output is persisted
|
||||
to ``meta/canonical_vocabulary.json`` and consumed by phase 1.
|
||||
"""
|
||||
|
||||
vlm: VlmClient
|
||||
config: VocabularyConfig
|
||||
frame_provider: FrameProvider = field(default_factory=null_provider)
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
return self.config.enabled
|
||||
|
||||
def discover(
|
||||
self,
|
||||
records: Sequence[EpisodeRecord],
|
||||
*,
|
||||
existing: Vocabulary | None = None,
|
||||
) -> Vocabulary | None:
|
||||
"""Run vocabulary discovery against the first N sample episodes.
|
||||
|
||||
``existing`` short-circuits the VLM call when ``config.reuse_existing``
|
||||
is True and an on-disk vocabulary is already present — keeps re-runs
|
||||
cheap and lets operators hand-edit the file without it getting
|
||||
overwritten.
|
||||
"""
|
||||
if existing is not None and self.config.reuse_existing:
|
||||
logger.info(
|
||||
"vocabulary: reusing existing (%d subtasks, %d memory milestones)",
|
||||
len(existing.subtasks),
|
||||
len(existing.memory_milestones),
|
||||
)
|
||||
return existing
|
||||
|
||||
sample = list(records[: max(1, int(self.config.sample_episodes))])
|
||||
if not sample:
|
||||
return None
|
||||
|
||||
task_hint = next((r.episode_task for r in sample if r.episode_task), "")
|
||||
prompt = load_prompt("module_0_vocabulary").format(
|
||||
episode_task=task_hint or "(unspecified)",
|
||||
n_episodes=len(sample),
|
||||
)
|
||||
# Pack one video block per sample episode so the VLM sees the
|
||||
# variation across episodes (different starting poses, different
|
||||
# object placements) rather than overfitting to one trajectory.
|
||||
content: list[dict[str, Any]] = []
|
||||
for record in sample:
|
||||
video_frames = self.frame_provider.video_for_episode(
|
||||
record, int(self.config.max_video_frames_per_episode)
|
||||
)
|
||||
if video_frames:
|
||||
content.extend(to_video_block(video_frames))
|
||||
content.append({"type": "text", "text": prompt})
|
||||
messages = [{"role": "user", "content": content}]
|
||||
|
||||
result = self.vlm.generate_json([messages])[0]
|
||||
if not isinstance(result, dict):
|
||||
logger.warning("vocabulary: VLM did not return a JSON object — skipping")
|
||||
return None
|
||||
|
||||
vocab = Vocabulary.from_json(result)
|
||||
if vocab.is_empty():
|
||||
logger.warning("vocabulary: VLM returned an empty vocabulary — skipping")
|
||||
return None
|
||||
logger.info(
|
||||
"vocabulary: discovered %d subtask labels + %d memory milestones from %d episodes",
|
||||
len(vocab.subtasks),
|
||||
len(vocab.memory_milestones),
|
||||
len(sample),
|
||||
)
|
||||
return vocab
|
||||
@@ -40,6 +40,7 @@ from lerobot.annotations.steerable_pipeline.modules import (
|
||||
)
|
||||
from lerobot.annotations.steerable_pipeline.validator import StagingValidator
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import make_vlm_client
|
||||
from lerobot.annotations.steerable_pipeline.vocabulary import VocabularyDiscoveryModule
|
||||
from lerobot.annotations.steerable_pipeline.writer import LanguageColumnsWriter
|
||||
from lerobot.configs import parser
|
||||
|
||||
@@ -88,6 +89,9 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
|
||||
vlm=vlm, config=cfg.interjections, seed=cfg.seed, frame_provider=frame_provider
|
||||
)
|
||||
vqa = GeneralVqaModule(vlm=vlm, config=cfg.vqa, seed=cfg.seed, frame_provider=frame_provider)
|
||||
vocabulary = VocabularyDiscoveryModule(
|
||||
vlm=vlm, config=cfg.vocabulary, frame_provider=frame_provider
|
||||
)
|
||||
writer = LanguageColumnsWriter()
|
||||
validator = StagingValidator(
|
||||
dataset_camera_keys=tuple(getattr(frame_provider, "camera_keys", []) or []) or None,
|
||||
@@ -98,6 +102,7 @@ def annotate(cfg: AnnotationPipelineConfig) -> None:
|
||||
plan=plan,
|
||||
interjections=interjections,
|
||||
vqa=vqa,
|
||||
vocabulary=vocabulary,
|
||||
writer=writer,
|
||||
validator=validator,
|
||||
)
|
||||
@@ -140,7 +145,7 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
|
||||
exist_ok=True,
|
||||
)
|
||||
print(f"[lerobot-annotate] uploading {root} -> {repo_id}...", flush=True)
|
||||
api.upload_folder(
|
||||
commit_info = api.upload_folder(
|
||||
folder_path=str(root),
|
||||
repo_id=repo_id,
|
||||
repo_type="dataset",
|
||||
@@ -149,6 +154,48 @@ def _push_to_hub(root: Path, cfg: AnnotationPipelineConfig) -> None:
|
||||
)
|
||||
print(f"[lerobot-annotate] uploaded to https://huggingface.co/datasets/{repo_id}", flush=True)
|
||||
|
||||
# Tag the upload with the codebase version. ``LeRobotDatasetMetadata``
|
||||
# resolves the dataset revision via ``get_safe_version`` which scans
|
||||
# for tags like ``v3.0``; without a tag it raises
|
||||
# ``RevisionNotFoundError``. Read the version straight from the
|
||||
# dataset's own ``meta/info.json`` so we tag whatever the writer
|
||||
# actually wrote (no accidental drift if the codebase floor moves).
|
||||
from lerobot.datasets.dataset_metadata import CODEBASE_VERSION # noqa: PLC0415
|
||||
|
||||
info_path = root / "meta" / "info.json"
|
||||
version_tag = CODEBASE_VERSION
|
||||
if info_path.exists():
|
||||
try:
|
||||
from lerobot.utils.io_utils import load_json # noqa: PLC0415
|
||||
|
||||
info = load_json(info_path)
|
||||
ds_version = info.get("codebase_version")
|
||||
if isinstance(ds_version, str) and ds_version.startswith("v"):
|
||||
version_tag = ds_version
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(f"[lerobot-annotate] could not read codebase_version from info.json ({exc}); falling back to {version_tag}", flush=True)
|
||||
revision = getattr(commit_info, "oid", None)
|
||||
tag_kwargs = {
|
||||
"repo_id": repo_id,
|
||||
"tag": version_tag,
|
||||
"repo_type": "dataset",
|
||||
"exist_ok": True,
|
||||
}
|
||||
if revision is not None:
|
||||
tag_kwargs["revision"] = revision
|
||||
|
||||
try:
|
||||
api.create_tag(**tag_kwargs)
|
||||
print(f"[lerobot-annotate] tagged {repo_id} as {version_tag}", flush=True)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
print(
|
||||
f"[lerobot-annotate] WARNING: could not create tag {version_tag!r} on {repo_id}: {exc}. "
|
||||
"Dataset is uploaded but ``LeRobotDataset`` won't be able to load it until it's tagged. "
|
||||
"Run: from huggingface_hub import HfApi; "
|
||||
f"HfApi().create_tag({repo_id!r}, tag={version_tag!r}, repo_type='dataset', exist_ok=True)",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
annotate()
|
||||
|
||||
@@ -80,7 +80,6 @@ def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path:
|
||||
{"text": "place the sponge into the sink", "start": 0.8, "end": 1.1},
|
||||
]
|
||||
},
|
||||
"concise hierarchical PLAN": {"plan": "1. grasp\n2. wipe\n3. place"},
|
||||
"Update the memory": {"memory": "wiped the counter once"},
|
||||
},
|
||||
)
|
||||
@@ -96,10 +95,16 @@ def test_module1_plan_memory_subtask_smoke(fixture_dataset_root: Path, tmp_path:
|
||||
frame_set = set(record.frame_timestamps)
|
||||
for row in rows:
|
||||
assert row["timestamp"] in frame_set
|
||||
# exactly one plan row at t0
|
||||
plan_rows = [r for r in rows if r["style"] == "plan"]
|
||||
assert len(plan_rows) == 1
|
||||
# one plan row per subtask boundary; the first lands at t0 and each
|
||||
# plan is the deterministic numbered list of still-todo subtasks
|
||||
plan_rows = sorted((r for r in rows if r["style"] == "plan"), key=lambda r: r["timestamp"])
|
||||
subtask_rows = [r for r in rows if r["style"] == "subtask"]
|
||||
assert len(plan_rows) == len(subtask_rows)
|
||||
assert plan_rows[0]["timestamp"] == record.frame_timestamps[0]
|
||||
# the t0 plan enumerates all subtasks; later plans shrink
|
||||
assert plan_rows[0]["content"].startswith("1. ")
|
||||
assert len(plan_rows[0]["content"].splitlines()) == len(subtask_rows)
|
||||
assert len(plan_rows[-1]["content"].splitlines()) == 1
|
||||
|
||||
|
||||
def test_module2_at_t0_emits_speech_only_no_interjection(fixture_dataset_root: Path, tmp_path: Path) -> None:
|
||||
|
||||
412
tests/annotations/test_vocabulary.py
Normal file
412
tests/annotations/test_vocabulary.py
Normal file
@@ -0,0 +1,412 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Vocabulary-discovery phase (phase 0) tests."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from lerobot.annotations.steerable_pipeline.config import (
|
||||
PlanConfig,
|
||||
VocabularyConfig,
|
||||
)
|
||||
from lerobot.annotations.steerable_pipeline.modules import PlanSubtasksMemoryModule
|
||||
from lerobot.annotations.steerable_pipeline.reader import iter_episodes
|
||||
from lerobot.annotations.steerable_pipeline.staging import EpisodeStaging
|
||||
from lerobot.annotations.steerable_pipeline.vocabulary import (
|
||||
Vocabulary,
|
||||
VocabularyDiscoveryModule,
|
||||
load_vocabulary,
|
||||
save_vocabulary,
|
||||
vocabulary_path,
|
||||
)
|
||||
|
||||
from ._helpers import make_canned_responder
|
||||
|
||||
|
||||
_CANONICAL_SUBTASKS = (
|
||||
"grasp blue cube",
|
||||
"place blue cube in box",
|
||||
"retract arm",
|
||||
)
|
||||
_CANONICAL_MEMORY = (
|
||||
"I picked up the blue cube.",
|
||||
"I placed the blue cube in the box.",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Vocabulary dataclass + on-disk round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_vocabulary_roundtrip(tmp_path: Path) -> None:
|
||||
vocab = Vocabulary(
|
||||
subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY
|
||||
)
|
||||
save_path = save_vocabulary(tmp_path, vocab)
|
||||
assert save_path == vocabulary_path(tmp_path)
|
||||
assert save_path.exists()
|
||||
|
||||
loaded = load_vocabulary(tmp_path)
|
||||
assert loaded is not None
|
||||
assert loaded.subtasks == _CANONICAL_SUBTASKS
|
||||
assert loaded.memory_milestones == _CANONICAL_MEMORY
|
||||
|
||||
|
||||
def test_vocabulary_load_missing_returns_none(tmp_path: Path) -> None:
|
||||
assert load_vocabulary(tmp_path) is None
|
||||
|
||||
|
||||
def test_vocabulary_load_malformed_returns_none(tmp_path: Path) -> None:
|
||||
path = vocabulary_path(tmp_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text("{ not valid json", encoding="utf-8")
|
||||
assert load_vocabulary(tmp_path) is None
|
||||
|
||||
|
||||
def test_vocabulary_load_empty_payload_returns_none(tmp_path: Path) -> None:
|
||||
path = vocabulary_path(tmp_path)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps({"subtasks": [], "memory_milestones": []}), encoding="utf-8")
|
||||
assert load_vocabulary(tmp_path) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Discovery module
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_vocabulary_discovery_calls_vlm_and_returns_vocab(
|
||||
fixture_dataset_root: Path,
|
||||
) -> None:
|
||||
vlm = make_canned_responder(
|
||||
{
|
||||
"canonical vocabulary": {
|
||||
"subtasks": list(_CANONICAL_SUBTASKS),
|
||||
"memory_milestones": list(_CANONICAL_MEMORY),
|
||||
}
|
||||
}
|
||||
)
|
||||
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig(sample_episodes=2))
|
||||
records = list(iter_episodes(fixture_dataset_root))
|
||||
vocab = module.discover(records)
|
||||
assert vocab is not None
|
||||
assert vocab.subtasks == _CANONICAL_SUBTASKS
|
||||
assert vocab.memory_milestones == _CANONICAL_MEMORY
|
||||
|
||||
|
||||
def test_vocabulary_discovery_reuses_existing(fixture_dataset_root: Path) -> None:
|
||||
"""``reuse_existing=True`` short-circuits the VLM call entirely."""
|
||||
|
||||
def _explode(_messages): # pragma: no cover - must not be called
|
||||
raise AssertionError("VLM should not be invoked when reusing existing vocabulary")
|
||||
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
vlm = StubVlmClient(responder=_explode)
|
||||
module = VocabularyDiscoveryModule(
|
||||
vlm=vlm, config=VocabularyConfig(reuse_existing=True)
|
||||
)
|
||||
records = list(iter_episodes(fixture_dataset_root))
|
||||
existing = Vocabulary(subtasks=("a", "b"), memory_milestones=("I a.",))
|
||||
vocab = module.discover(records, existing=existing)
|
||||
assert vocab is existing
|
||||
|
||||
|
||||
def test_vocabulary_discovery_empty_payload_returns_none(
|
||||
fixture_dataset_root: Path,
|
||||
) -> None:
|
||||
vlm = make_canned_responder({"canonical vocabulary": {"subtasks": [], "memory_milestones": []}})
|
||||
module = VocabularyDiscoveryModule(vlm=vlm, config=VocabularyConfig())
|
||||
records = list(iter_episodes(fixture_dataset_root))
|
||||
assert module.discover(records) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PlanSubtasksMemoryModule consumes the vocabulary
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_plan_module_inlines_vocab_into_subtask_prompt(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
captured: list[str] = []
|
||||
|
||||
def responder(messages):
|
||||
# Find the last user text block and stash it for inspection.
|
||||
for message in messages:
|
||||
content = message.get("content")
|
||||
if isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "text":
|
||||
captured.append(block.get("text", ""))
|
||||
# Return canned subtasks; pick the first two canonical strings so
|
||||
# the validator accepts them.
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
|
||||
{"text": "place blue cube in box", "start": 0.4, "end": 0.9},
|
||||
]
|
||||
}
|
||||
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
# The subtask prompt (and the memory prompt) carries the canonical
|
||||
# bullet list so the VLM can't paraphrase them away.
|
||||
assert any("Canonical subtask labels:" in t for t in captured)
|
||||
assert any("grasp blue cube" in t for t in captured)
|
||||
|
||||
|
||||
def test_plan_module_accepts_article_only_difference(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""Articles like 'the'/'a'/'an' are stripped during validation."""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
def responder(_messages):
|
||||
return {
|
||||
"subtasks": [
|
||||
# Same canonical phrase modulo "the" — should be accepted.
|
||||
{"text": "grasp the blue cube", "start": 0.0, "end": 0.4},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
||||
assert subtask_texts == ["grasp blue cube"]
|
||||
|
||||
|
||||
def test_plan_module_retries_when_subtask_off_vocab(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""One-shot retry replaces an off-vocab paraphrase with the canonical form."""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
call_count = {"n": 0}
|
||||
|
||||
def responder(messages):
|
||||
call_count["n"] += 1
|
||||
# First call: returns an off-vocab paraphrase.
|
||||
if call_count["n"] == 1:
|
||||
return {
|
||||
"subtasks": [
|
||||
# paraphrase, not in vocab
|
||||
{"text": "pick up blue cube", "start": 0.0, "end": 0.4},
|
||||
]
|
||||
}
|
||||
# Second call (the retry): should contain the correction prompt;
|
||||
# respond with the canonical phrase exactly.
|
||||
last_user_text = ""
|
||||
for message in messages:
|
||||
content = message.get("content")
|
||||
if isinstance(content, str):
|
||||
last_user_text = content
|
||||
elif isinstance(content, list):
|
||||
for block in content:
|
||||
if isinstance(block, dict) and block.get("type") == "text":
|
||||
last_user_text = block.get("text", "")
|
||||
assert "NOT in the canonical vocabulary" in last_user_text
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "grasp blue cube", "start": 0.0, "end": 0.4},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
||||
assert subtask_texts == ["grasp blue cube"]
|
||||
# The retry must have fired exactly once.
|
||||
assert call_count["n"] == 2
|
||||
|
||||
|
||||
def test_plan_module_drops_off_vocab_subtask_after_retry(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""If the VLM stays off-vocab even after the retry, the bad span is dropped."""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
call_count = {"n": 0}
|
||||
|
||||
def responder(_messages):
|
||||
call_count["n"] += 1
|
||||
# Both calls return the same off-vocab span — the model can't
|
||||
# be corrected. The second call also returns one in-vocab span
|
||||
# so the episode isn't empty; this lets us check that the
|
||||
# off-vocab span is dropped without affecting the in-vocab one.
|
||||
if call_count["n"] == 1:
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
|
||||
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
|
||||
]
|
||||
}
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "perform a fancy macarena dance", "start": 0.0, "end": 0.4},
|
||||
{"text": "grasp blue cube", "start": 0.4, "end": 0.9},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
||||
# Retry fired exactly once; bad span dropped, good span kept.
|
||||
assert call_count["n"] == 2
|
||||
assert subtask_texts == ["grasp blue cube"]
|
||||
|
||||
|
||||
def test_plan_module_bumps_collocated_subtasks_to_distinct_frames(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""Two subtasks whose starts snap to the same frame get split onto two frames.
|
||||
|
||||
Without this guard, both spans would emit ``style=subtask`` rows at the
|
||||
identical persistent timestamp; the training-time renderer's
|
||||
``active_at(t, style=subtask)`` then raises an ambiguity error.
|
||||
"""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
def responder(_messages):
|
||||
# Two canonical labels with starts within one frame of each other —
|
||||
# both snap to the same source frame, so the dedupe pass must bump
|
||||
# the later one to the next frame.
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "grasp blue cube", "start": 0.40, "end": 0.42},
|
||||
{"text": "place blue cube in box", "start": 0.41, "end": 0.50},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_rows = [r for r in rows if r["style"] == "subtask"]
|
||||
# Both subtasks present, both on distinct timestamps.
|
||||
assert len(subtask_rows) == 2
|
||||
timestamps = [r["timestamp"] for r in subtask_rows]
|
||||
assert len(set(timestamps)) == 2, f"subtask timestamps collide: {timestamps}"
|
||||
# Order preserved: the chronologically earlier span keeps the earlier
|
||||
# frame, the later one was bumped onto the next available frame.
|
||||
assert subtask_rows[0]["content"] == "grasp blue cube"
|
||||
assert subtask_rows[1]["content"] == "place blue cube in box"
|
||||
assert subtask_rows[1]["timestamp"] > subtask_rows[0]["timestamp"]
|
||||
|
||||
|
||||
def test_plan_module_empty_when_all_off_vocab_after_retry(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""All-off-vocab spans → episode comes out empty (no silent fuzzy snap)."""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
def responder(_messages):
|
||||
# Returns the same off-vocab spans on both attempts.
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "make a smoothie", "start": 0.0, "end": 0.4},
|
||||
{"text": "consult the wizard", "start": 0.4, "end": 0.9},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
vocab = Vocabulary(subtasks=_CANONICAL_SUBTASKS, memory_milestones=_CANONICAL_MEMORY)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm,
|
||||
config=PlanConfig(n_task_rephrasings=0),
|
||||
vocabulary=vocab,
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
||||
# No subtask gets fabricated — better to leave the episode empty
|
||||
# so the operator notices the vocabulary gap than to silently
|
||||
# warp the labels.
|
||||
assert subtask_texts == []
|
||||
|
||||
|
||||
def test_plan_module_without_vocab_passes_through(
|
||||
fixture_dataset_root: Path, tmp_path: Path
|
||||
) -> None:
|
||||
"""No vocabulary configured → original free-form behavior is preserved."""
|
||||
from lerobot.annotations.steerable_pipeline.vlm_client import StubVlmClient
|
||||
|
||||
def responder(_messages):
|
||||
return {
|
||||
"subtasks": [
|
||||
{"text": "any free-form text the VLM wants", "start": 0.0, "end": 1.0},
|
||||
]
|
||||
}
|
||||
|
||||
vlm = StubVlmClient(responder=responder)
|
||||
module = PlanSubtasksMemoryModule(
|
||||
vlm=vlm, config=PlanConfig(n_task_rephrasings=0)
|
||||
)
|
||||
record = next(iter_episodes(fixture_dataset_root))
|
||||
staging = EpisodeStaging(tmp_path / "stage", record.episode_index)
|
||||
module.run_episode(record, staging)
|
||||
rows = staging.read("plan")
|
||||
subtask_texts = [r["content"] for r in rows if r["style"] == "subtask"]
|
||||
assert subtask_texts == ["any free-form text the VLM wants"]
|
||||
51
tests/scripts/test_lerobot_annotate.py
Normal file
51
tests/scripts/test_lerobot_annotate.py
Normal file
@@ -0,0 +1,51 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
|
||||
|
||||
def test_push_to_hub_tags_uploaded_dataset_revision(tmp_path, monkeypatch):
|
||||
from lerobot.scripts.lerobot_annotate import _push_to_hub
|
||||
|
||||
root = tmp_path / "dataset"
|
||||
(root / "meta").mkdir(parents=True)
|
||||
(root / "meta" / "info.json").write_text(json.dumps({"codebase_version": "v3.0"}))
|
||||
|
||||
calls = {}
|
||||
|
||||
class FakeHfApi:
|
||||
def create_repo(self, **kwargs):
|
||||
calls["create_repo"] = kwargs
|
||||
|
||||
def upload_folder(self, **kwargs):
|
||||
calls["upload_folder"] = kwargs
|
||||
return SimpleNamespace(oid="abc123")
|
||||
|
||||
def create_tag(self, **kwargs):
|
||||
calls["create_tag"] = kwargs
|
||||
|
||||
monkeypatch.setattr("huggingface_hub.HfApi", FakeHfApi)
|
||||
|
||||
cfg = SimpleNamespace(
|
||||
repo_id="source/dataset",
|
||||
dest_repo_id="annotated/dataset",
|
||||
push_private=True,
|
||||
push_commit_message=None,
|
||||
)
|
||||
|
||||
_push_to_hub(root, cfg)
|
||||
|
||||
assert calls["create_repo"] == {
|
||||
"repo_id": "annotated/dataset",
|
||||
"repo_type": "dataset",
|
||||
"private": True,
|
||||
"exist_ok": True,
|
||||
}
|
||||
assert calls["upload_folder"]["repo_id"] == "annotated/dataset"
|
||||
assert calls["create_tag"] == {
|
||||
"repo_id": "annotated/dataset",
|
||||
"tag": "v3.0",
|
||||
"repo_type": "dataset",
|
||||
"exist_ok": True,
|
||||
"revision": "abc123",
|
||||
}
|
||||
Reference in New Issue
Block a user