From 20c7a12dd56aa75016c8c96e7cc101620d701633 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Thu, 4 Jun 2026 14:05:46 +0200 Subject: [PATCH] annotate: remove dead code, document CLI options, compact config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dead code (defined but never referenced anywhere in src/tests/examples): * reader.py: keyframe_indices, episode_frame_timestamps, lookup_data_path, and the now-orphaned gather_data_paths + episode_offsets_per_path (lookup_data_path was their only caller). * staging.py: iter_staged_episodes. * writer.py: normalize_rows_for_writer. * config.py VlmConfig: json_mode, batch_size, tensor_parallel_size, gpu_memory_utilization, trust_remote_code — consumed only by the in-process vllm/transformers backends that were removed; the openai auto-serve path carries those vLLM flags via serve_command instead. Kept max_model_len (still used as the serve-command default). * config.py TaskAugAxesConfig.total property. Docs: new 'Key options' section in annotation_pipeline.mdx — grouped tables (dataset in/out, module toggles, --vlm.*, --plan.*, interjections + vqa) describing the flags users actually reach for, with defaults. config.py: compact the verbose field comments + ActionRecordsConfig / TaskAugAxesConfig docstrings; fix two stale 'verify' references (the verify pass was removed — it's describe -> segment now) and the stale 'renders record back to subtask text' note (that path was removed). vlm_client docstring no longer mentions the removed json_mode field. Verified: tests/annotations + tests/datasets/test_language + tests/scripts/test_lerobot_annotate (40 passed); pre-commit clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/source/annotation_pipeline.mdx | 59 +++++++ .../annotations/steerable_pipeline/config.py | 156 ++++++------------ .../annotations/steerable_pipeline/reader.py | 58 ------- .../annotations/steerable_pipeline/staging.py | 14 +- .../steerable_pipeline/vlm_client.py | 3 +- .../annotations/steerable_pipeline/writer.py | 18 +- 6 files changed, 111 insertions(+), 197 deletions(-) diff --git a/docs/source/annotation_pipeline.mdx b/docs/source/annotation_pipeline.mdx index c45617ee6..c9eefeb3e 100644 --- a/docs/source/annotation_pipeline.mdx +++ b/docs/source/annotation_pipeline.mdx @@ -117,6 +117,65 @@ To use a different dataset, model, or hub repo, edit the `CMD` block in the script. Every flag there maps directly to a `lerobot-annotate` flag (run `lerobot-annotate --help` for the full list). +## Key options + +These are the flags you'll reach for most often. Run +`lerobot-annotate --help` for everything else; the defaults are tuned for +short manipulation episodes. + +### Dataset in / out + +| Flag | Default | What it does | +| ----------------- | ------- | ----------------------------------------------------------------------- | +| `--repo_id` | — | Hub dataset to annotate (downloaded if `--root` unset). | +| `--root` | — | Annotate a local dataset directory instead. | +| `--new_repo_id` | — | Push the result to a new repo (leaves the source repo untouched). | +| `--push_to_hub` | `false` | Upload after annotating (to `--new_repo_id`, else back to `--repo_id`). | +| `--only_episodes` | all | Annotate just these episode indices (handy for a test run). | +| `--seed` | `1729` | Seeds the RNGs that pick interjection timestamps + VQA question types. | + +### Which modules run + +Each module can be turned off independently to iterate on one at a time: +`--plan.enabled`, `--interjections.enabled`, `--vqa.enabled` (all +`true` by default). + +### The VLM (`--vlm.*`) + +| Flag | Default | What it does | +| -------------------------- | ------------------ | ----------------------------------------------------------------------------------- | +| `--vlm.model_id` | `Qwen/Qwen3.6-27B` | The model to serve and prompt. | +| `--vlm.camera_key` | first `images.*` | Which camera every prompt is grounded on. | +| `--vlm.serve_command` | auto | The exact `vllm serve …` command (set TP size, GPU memory, `--max-model-len` here). | +| `--vlm.parallel_servers` | `1` | Independent servers for round-robin routing (one per GPU). | +| `--vlm.num_gpus` | `0` | GPUs per server (`0` = one each). | +| `--vlm.client_concurrency` | `16` | In-flight requests across all servers. | +| `--vlm.max_new_tokens` | `512` | Generation cap per call. | +| `--vlm.temperature` | `0.2` | Sampling temperature. | + +### Subtasks / plan / memory (`--plan.*`) + +| Flag | Default | What it does | +| ------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------- | +| `--plan.frames_per_second` | `1.0` | How densely the episode video is sampled. | +| `--plan.max_video_frames` | `32` | Hard cap on frames per call (context-budget guard — don't exceed ~32 for a 32k context). | +| `--plan.subtask_window_seconds` | `0` | Split long episodes into fixed windows for constant frame density (`0` = whole episode). | +| `--plan.plan_max_steps` | `8` | Upper bound on subtasks per episode. | +| `--plan.subtask_describe_first` | `true` | Run the describe→segment grounding pass (best subtask quality; +1 call/episode). | +| `--plan.emit_plan` | `true` | Emit the numbered `plan` rows (`false` = subtasks + memory only). | +| `--plan.n_task_rephrasings` | `10` | How many `task_aug` rephrasings to emit (`0` disables). | +| `--plan.derive_task_from_video` | `if_short` | Use the dataset task as-is (`off`), only when it's missing/short (`if_short`), or always re-derive from video (`always`). | +| `--plan.use_video_url` | `false` | Send a server-side video clip instead of embedded frames. | + +### Interjections + VQA + +| Flag | Default | What it does | +| ----------------------------------------------- | ------- | ---------------------------------------------------------- | +| `--interjections.max_interjections_per_episode` | `3` | Cap on interjection/speech pairs per episode. | +| `--vqa.vqa_emission_hz` | `1.0` | How often VQA pairs are emitted. | +| `--vqa.restrict_to_default_camera` | `false` | Ground VQA only on `--vlm.camera_key` (else every camera). | +| `--executor.episode_parallelism` | `16` | Episodes processed concurrently within each phase. | + ## Contributing new modules The pipeline is built to grow, and **contributions are very welcome** — diff --git a/src/lerobot/annotations/steerable_pipeline/config.py b/src/lerobot/annotations/steerable_pipeline/config.py index 10484fd3a..744de9a29 100644 --- a/src/lerobot/annotations/steerable_pipeline/config.py +++ b/src/lerobot/annotations/steerable_pipeline/config.py @@ -44,78 +44,56 @@ class PlanConfig: derive_task_from_video: str = "if_short" derive_task_min_words: int = 3 - # Frame sampling for the subtask-decomposition prompt. Frames are - # sampled uniformly across the whole episode up to ``max_video_frames`` - # (so longer episodes are subsampled, not truncated). - # - # ``max_video_frames`` is a HARD context-budget cap. With the embedded- - # frame path (use_video_url=false), every frame becomes ~250-320 vision - # tokens, so 128 frames ≈ 33-39k tokens — over a 32k-context VLM. 32 - # frames (~8-10k tokens) leaves ample room for the prompt + the - # describe / verify passes. Raise only if your serving context is - # larger AND your episodes need finer temporal resolution; if you hit - # "Input length exceeds maximum context length", lower this. + # Frames are sampled uniformly across the episode, capped at + # ``max_video_frames`` (a HARD context-budget cap, not an annotation + # knob). Each embedded frame is ~250-320 vision tokens, so 32 frames + # (~8-10k tokens) fit a 32k-context VLM; 128 would overflow it. Lower + # this if you hit "Input length exceeds maximum context length". frames_per_second: float = 1.0 max_video_frames: int = 32 # Windowed subtask generation for CONSTANT temporal density. When > 0 - # and an episode is longer than this many seconds, the plan module - # processes the episode in consecutive windows of this length, each - # sampled at ``frames_per_second``, instead of subsampling the whole - # episode to ``max_video_frames`` (which makes long episodes sparse). - # The describe -> segment -> verify chain runs per window; results are - # offset to absolute time, merged, and stitched into a contiguous - # whole-episode cover. Cost scales with episode length (≈ chain calls - # × ceil(duration / window)). Set to ~max_video_frames / frames_per_ - # second (e.g. 32s at 1 fps) so each window fills — but never exceeds — - # the per-call frame budget. 0 disables (single whole-episode call). + # and the episode is longer than this, the plan module processes it in + # consecutive windows of this length (each sampled at + # ``frames_per_second``) instead of subsampling the whole episode to a + # sparse ``max_video_frames``. The describe -> segment chain runs per + # window; spans are merged + stitched. Set to ~max_video_frames / + # frames_per_second (e.g. 32s at 1 fps). 0 disables. subtask_window_seconds: float = 0.0 min_subtask_seconds: float = 1.5 plan_max_steps: int = 8 - # ``subtask_describe_first``: run a grounding pass that narrates ONLY - # what is visible in the video (no subtask JSON yet), then inject that - # description into the segmentation prompt. Forces the model to observe - # before committing to structured output — the strongest lever against - # subtasks invented from the task text. ON by default; +1 VLM call/ep. - # Set False to trade quality for fewer calls on easy datasets. + # Run a grounding pass that narrates ONLY what's visible (no subtask + # JSON yet), then feed that into the segmentation prompt — the strongest + # lever against subtasks invented from the task text. ON by default; + # +1 VLM call/episode. False trades quality for fewer calls. subtask_describe_first: bool = True - # Emit ``style="plan"`` rows (the numbered still-todo list re-emitted at - # every subtask boundary). Set False to keep only subtasks + memory and - # skip the plan rows entirely — saves one ``_generate_plan`` VLM call per - # subtask boundary. Subtask and memory generation are unaffected. + # Emit ``style="plan"`` rows (the numbered still-todo list, re-emitted at + # every subtask boundary). False keeps only subtasks + memory and skips + # the per-boundary ``_generate_plan`` call. emit_plan: bool = True - # NOTE: subtask spans are ALWAYS stitched into a contiguous - # full-episode cover (first subtask pulled back to t0, gaps closed, - # last span extended to t_last) as a deterministic post-step in - # ``_generate_subtasks._stitch_full_coverage``. This is not - # configurable — a sparse / gap-ridden subtask timeline is never - # desirable for conditioning, so it is unconditional. + # NOTE: subtask spans are ALWAYS stitched into a contiguous full-episode + # cover (see ``_stitch_full_coverage``) — not configurable, since a + # sparse / gap-ridden timeline is never useful for conditioning. - # When True (and backend supports it, e.g. ``openai``), the ``plan`` - # module sends a ``video_url`` block pointing at a per-episode mp4 - # subclip and lets the server sample frames at ``use_video_url_fps``. + # When True (with a backend that supports it, e.g. ``openai``), send a + # ``video_url`` block pointing at a per-episode mp4 subclip and let the + # server sample frames at ``use_video_url_fps``. use_video_url: bool = False use_video_url_fps: float = 1.0 - # Structured per-subtask action records (Phase 1a + 1b, inspired by - # EgoMimic's annotator form). For each generated subtask span, the - # VLM extracts a typed record (verb / object / arm / grasp_type / - # destination / mistake). A deterministic Python template renders - # that record back to canonical subtask text — reducing the VLM's - # "creative" surface to just the perception step. See - # ``ActionRecordsConfig`` for details. Off by default (back-compat). + # Optional structured per-subtask action records (EgoMimic-style). When + # enabled, the VLM extracts a typed record per subtask span; see + # ``ActionRecordsConfig``. Purely additive — off by default. action_records: ActionRecordsConfig = field(default_factory=lambda: ActionRecordsConfig()) - # Structured 5-axis augmentation taxonomy for the t=0 task variants - # (replaces the free-form ``n_task_rephrasings`` flow when enabled). - # Mirrors EgoMimic's ``augment_prompt.txt`` taxonomy: instead of N - # free-form rephrasings, the VLM produces variants along named - # axes (synonym / omit_arm / omit_orientation / omit_grasp_method / - # combined). Off by default (back-compat). + # Optional 5-axis task-augmentation taxonomy for the t=0 variants + # (EgoMimic-style: synonym / omit_arm / omit_orientation / + # omit_grasp_method / combined). Replaces the free-form + # ``n_task_rephrasings`` flow when enabled; see ``TaskAugAxesConfig``. task_aug_axes: TaskAugAxesConfig = field(default_factory=lambda: TaskAugAxesConfig()) @@ -123,9 +101,8 @@ class PlanConfig: class ActionRecordsConfig: """Structured per-subtask action record extraction. - When ``enabled=True``, after the existing subtask-span generation in - ``plan_subtasks_memory.py``, the module makes one extra VLM call per - subtask to extract a typed record:: + When ``enabled=True``, after subtask-span generation the module makes + one extra VLM call per subtask to extract a typed record:: { "verb": "pick" | "place" | "press" | ..., # closed vocabulary @@ -136,20 +113,13 @@ class ActionRecordsConfig: "mistake": "" | null, } - The record is emitted as a separate row with ``style="action_record"`` - (``content=json.dumps(record)``) at the subtask's start timestamp. - It is PURELY ADDITIVE — it never touches the VLM's subtask text. - Downstream training can consume the typed schema directly (e.g. - auxiliary supervision on verb / arm / grasp classification heads) - while the subtask string the policy conditions on stays exactly what - the subtask module produced. (Reconstructing subtask text from these - fields was too easy for the VLM to hallucinate on tasks that don't - fit the manipulation schema — navigation tasks yielded nonsense like - ``move stove to stove`` — so that path was removed.) + Emitted as a separate ``style="action_record"`` row at the subtask's + start timestamp. PURELY ADDITIVE — it never touches the subtask text, + so downstream training can use the typed schema (e.g. auxiliary + verb/arm/grasp heads) while the conditioning string stays unchanged. - Cost: one extra VLM call per subtask. For an 8-subtask episode this - means ~8x more VLM calls in the plan module — still cheap relative - to the action-expert training cost, but worth knowing. + Cost: one extra VLM call per subtask (~8x plan-module calls on an + 8-subtask episode). """ enabled: bool = False @@ -204,26 +174,14 @@ class TaskAugAxesConfig: """Structured 5-axis augmentation taxonomy for t=0 task variants. When ``enabled=True``, replaces the free-form ``n_task_rephrasings`` - flow with a structured prompt that produces variants along five - named axes (mirroring EgoMimic's ``augment_prompt.txt``): + flow with variants along five named axes (EgoMimic-style): + ``synonym_paraphrase`` (reword, keep all info), ``omit_arm``, + ``omit_orientation``, ``omit_grasp_method``, and ``combined_omissions`` + (drop two at once). - * ``synonym_paraphrase`` — different wording / verbs, all - information preserved. - * ``omit_arm`` — drop the left/right/both arm specification. - * ``omit_orientation`` — drop orientation cues (upright, - sideways, ...). - * ``omit_grasp_method`` — drop grip / grasp method specification. - * ``combined_omissions`` — combine two of the above - simultaneously. - - Default counts (3+3+2+2+2 = 12 variants per task) match EgoMimic. - Axes that have nothing to omit in the source task (e.g. ``omit_arm`` - when the task doesn't mention an arm) emit fewer entries rather - than pad — the prompt instructs the VLM accordingly. - - Each variant is emitted as a ``task_aug`` row at ``t=0`` (same - style as the free-form variants), so the rest of the pipeline / - training recipe doesn't need to know about the taxonomy. + Default counts (3+3+2+2+2 = 12) match EgoMimic. Axes with nothing to + omit emit fewer entries rather than pad. Each variant becomes a + ``task_aug`` row at ``t=0``, identical in style to the free-form ones. """ enabled: bool = False @@ -234,17 +192,6 @@ class TaskAugAxesConfig: omit_grasp_method: int = 2 combined_omissions: int = 2 - @property - def total(self) -> int: - """Sum of requested variants across all axes (upper bound).""" - return ( - self.synonym_paraphrase - + self.omit_arm - + self.omit_orientation - + self.omit_grasp_method - + self.combined_omissions - ) - @dataclass class InterjectionsConfig: @@ -326,16 +273,11 @@ class VlmConfig: max_new_tokens: int = 512 temperature: float = 0.2 - json_mode: bool = True - batch_size: int = 4 - tensor_parallel_size: int = 1 - # Fraction of GPU memory vllm allocates for weights + KV cache. - gpu_memory_utilization: float = 0.9 - # Cap context length (None = model default). On 80 GB H100 a 30B BF16 - # model often needs <= 8192 to leave KV-cache headroom. + # Context length for the auto-spawned vLLM server (None → 32768). vLLM + # tuning flags (tensor-parallel size, GPU memory fraction, ...) go in + # ``serve_command`` directly, not here. max_model_len: int | None = None - trust_remote_code: bool = False # Override the camera stream used for keyframe attachment. None picks # the first ``observation.images.*`` key the dataset declares. diff --git a/src/lerobot/annotations/steerable_pipeline/reader.py b/src/lerobot/annotations/steerable_pipeline/reader.py index 6310a5b5e..22fe4ac26 100644 --- a/src/lerobot/annotations/steerable_pipeline/reader.py +++ b/src/lerobot/annotations/steerable_pipeline/reader.py @@ -214,61 +214,3 @@ def _iter_one_path(path: Path, tasks: dict[int, str], only_set: set[int] | None) rec = _build(cur_ep, start_offset, len(episode_col), cur_task_idx, ts_buf, fi_buf) if rec is not None: yield rec - - -def gather_data_paths(root: Path) -> list[Path]: - """Return every ``data/chunk-*/file-*.parquet`` path under ``root``.""" - return sorted((root / "data").rglob("*.parquet")) - - -def episode_offsets_per_path(path: Path) -> dict[int, tuple[int, int]]: - """Return ``{episode_index: (row_offset, row_count)}`` for one parquet.""" - table = pq.read_table(path, columns=["episode_index"]) - episode_col = table.column("episode_index").to_pylist() - out: dict[int, tuple[int, int]] = {} - cur_ep: int | None = None - start = 0 - for i, ep in enumerate(episode_col): - if cur_ep is None: - cur_ep = ep - start = i - continue - if ep != cur_ep: - out[cur_ep] = (start, i - start) - cur_ep = ep - start = i - if cur_ep is not None: - out[cur_ep] = (start, len(episode_col) - start) - return out - - -def keyframe_indices(record: EpisodeRecord, k: int) -> list[int]: - """Return ``k`` evenly spaced row indices into the episode (relative).""" - n = record.row_count - if k <= 0 or n == 0: - return [] - if k >= n: - return list(range(n)) - step = (n - 1) / (k - 1) if k > 1 else 0.0 - return [int(round(i * step)) for i in range(k)] if k > 1 else [n // 2] - - -def lookup_data_path(root: Path, episode_index: int) -> tuple[Path, int, int] | None: - """Find the parquet file containing ``episode_index`` and its slice bounds.""" - for path in gather_data_paths(root): - offsets = episode_offsets_per_path(path) - if episode_index in offsets: - start, count = offsets[episode_index] - return path, start, count - return None - - -def episode_frame_timestamps(root: Path, episode_index: int) -> tuple[Any, list[float]]: - """Return the parquet path and per-frame timestamps for ``episode_index``.""" - found = lookup_data_path(root, episode_index) - if found is None: - raise ValueError(f"Episode {episode_index} not found under {root}/data/") - path, start, count = found - table = pq.read_table(path, columns=["timestamp"]) - timestamps = table.column("timestamp").to_pylist()[start : start + count] - return path, [float(t) for t in timestamps] diff --git a/src/lerobot/annotations/steerable_pipeline/staging.py b/src/lerobot/annotations/steerable_pipeline/staging.py index da8f82097..0b47c4dd6 100644 --- a/src/lerobot/annotations/steerable_pipeline/staging.py +++ b/src/lerobot/annotations/steerable_pipeline/staging.py @@ -28,7 +28,7 @@ intermediate. from __future__ import annotations import json -from collections.abc import Iterable, Iterator +from collections.abc import Iterable from dataclasses import dataclass from pathlib import Path from typing import Any @@ -90,15 +90,3 @@ class EpisodeStaging: def has(self, module: ModuleName) -> bool: return self.path_for(module).exists() - - -def iter_staged_episodes(root: Path) -> Iterator[int]: - """Yield episode indices for which any staging artifact exists.""" - if not root.exists(): - return - for child in sorted(root.iterdir()): - if child.is_dir() and child.name.startswith("episode_"): - try: - yield int(child.name.removeprefix("episode_")) - except ValueError: - continue diff --git a/src/lerobot/annotations/steerable_pipeline/vlm_client.py b/src/lerobot/annotations/steerable_pipeline/vlm_client.py index d0ee10ad8..7f5e9da3c 100644 --- a/src/lerobot/annotations/steerable_pipeline/vlm_client.py +++ b/src/lerobot/annotations/steerable_pipeline/vlm_client.py @@ -23,8 +23,7 @@ into a real model. The client speaks one method, :meth:`VlmClient.generate_json`, which: - accepts a list of OpenAI/HF-style multimodal messages, -- requests JSON output (``json_mode=True`` enables guided decoding when the - backend supports it), +- requests JSON output from the server, - batches requests transparently, - and reprompts once on a JSON parse failure with an inline correction message before raising. diff --git a/src/lerobot/annotations/steerable_pipeline/writer.py b/src/lerobot/annotations/steerable_pipeline/writer.py index 52dd7f850..6710d08bd 100644 --- a/src/lerobot/annotations/steerable_pipeline/writer.py +++ b/src/lerobot/annotations/steerable_pipeline/writer.py @@ -46,7 +46,7 @@ from __future__ import annotations import logging from collections import defaultdict -from collections.abc import Iterable, Sequence +from collections.abc import Sequence from dataclasses import dataclass from pathlib import Path from typing import Any @@ -338,19 +338,3 @@ def speech_atom(timestamp: float, text: str) -> dict[str, Any]: } ], } - - -def normalize_rows_for_writer( - rows: Iterable[dict[str, Any]], -) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: - """Helper used by tests/validators to partition a flat row list into - (persistent_rows, event_rows) using ``column_for_style``. - """ - persistent: list[dict[str, Any]] = [] - events: list[dict[str, Any]] = [] - for row in rows: - if column_for_style(row.get("style")) == LANGUAGE_PERSISTENT: - persistent.append(row) - else: - events.append(row) - return persistent, events