annotations(steerable): structured action records + 5-axis task augmentation

EgoMimic-inspired additions to the plan module, both opt-in for back-compat.

1. PHASE 1a + 1b: per-subtask structured action records
   * cfg.action_records.enabled=True triggers, after Phase 1 subtask-span
     generation, one extra VLM call per subtask to extract a typed record:
       {verb, object, arm, grasp_type, destination, mistake}
   * A deterministic Python template (_render_action_record_to_subtask_text)
     renders the record back to canonical subtask text. When replace_subtask_
     text=True (default), this REPLACES the VLM's free-form text — eliminates
     cross-episode phrasing drift.
   * When emit_record_row=True (default), the structured record is also
     emitted as a row with style='action_record' (added to PERSISTENT_STYLES)
     so downstream training can consume the typed schema directly.
   * Verb + grasp vocabularies are configurable. Out-of-vocab values are
     rejected at extraction time.

2. STRUCTURED 5-AXIS TASK AUGMENTATION
   * cfg.task_aug_axes.enabled=True replaces the free-form n_task_rephrasings
     path with a structured prompt producing variants along 5 named axes:
       synonym_paraphrase (3)
       omit_arm           (3)
       omit_orientation   (2)
       omit_grasp_method  (2)
       combined_omissions (2)
     Total ~12 variants. Axes with nothing to omit emit fewer entries.
   * Each variant is emitted as a task_aug row at t=0 (existing style).

Inspired by https://github.com/GaTech-RL2/EgoVerse/tree/main/egomimic/scripts/language_process
— they pay Scale AI annotators to fill a structured form and then generate
language via a deterministic prompt. We get the same hallucination-reducing
structure via one extra VLM call per subtask.

Files:
  src/lerobot/datasets/language.py
  src/lerobot/annotations/steerable_pipeline/config.py
  src/lerobot/annotations/steerable_pipeline/modules/plan_subtasks_memory.py
  src/lerobot/annotations/steerable_pipeline/prompts/module_1_action_record.txt
  src/lerobot/annotations/steerable_pipeline/prompts/module_1_task_aug_axes.txt

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-02 10:01:47 +02:00
parent bb2c09965b
commit d04ea0ea8a
7 changed files with 791 additions and 5 deletions

View File

@@ -0,0 +1,162 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Emit the ``--dataset.episodes`` include-list for a LeRobot dataset, minus a
set of excluded episode indices.
``LeRobotDatasetConfig.episodes`` is an *include* list (train only on the listed
episodes), so "exclude episode X" means "pass every episode except X". This
helper builds that complement.
For ``pepijn223/robocasa_pretrain_human300_v4`` the default exclusion set is the
63 episodes that carry NO ``subtask`` annotation (in fact no persistent language
rows at all) — see the scan in this PR's discussion. Training the steerable
SmolVLA/pi052 policy on those episodes would feed it frames with empty subtask
targets, so we drop them.
Usage (prints a compact ``[0,1,2,...]`` list to stdout, logs to stderr):
python scripts/build_episode_filter.py \
--repo-id pepijn223/robocasa_pretrain_human300_v4
# capture in a shell script
EPISODES=$(python scripts/build_episode_filter.py --repo-id <id>)
lerobot-train ... --dataset.episodes="$EPISODES"
The helper reads ``meta/info.json`` from the Hub to learn ``total_episodes`` and
validates that every excluded index is in ``[0, total_episodes)`` before emitting
the complement. Pass ``--no-validate-hub`` to skip the network round-trip and use
``--total-episodes`` directly (e.g. for an offline / local dataset).
"""
from __future__ import annotations
import argparse
import json
import sys
# Episodes in pepijn223/robocasa_pretrain_human300_v4 with no `subtask`
# annotation (no persistent language rows at all). 63 episodes / 179,009 frames.
DEFAULT_EXCLUDE: tuple[int, ...] = (
1065, 2972, 6971, 8129, 9167, 9170, 9171, 9177, 9190, 9196, 9199, 9204,
9207, 9208, 9210, 9217, 9232, 9234, 9240, 9243, 9254, 9256, 9258, 9259,
9261, 9263, 9264, 15928, 16350, 18729, 20026, 21703, 25314, 25319, 25321,
25324, 25333, 25340, 25356, 25366, 25374, 25388, 25392, 25825, 25893,
26347, 26357, 26374, 26375, 26388, 26394, 26398, 26400, 26409, 26422,
26423, 26426, 26895, 26905, 26915, 26954, 27064, 30812,
)
def _log(msg: str) -> None:
print(msg, file=sys.stderr, flush=True)
def _total_episodes_from_hub(repo_id: str, revision: str | None) -> int:
"""Return ``total_episodes`` from the dataset's ``meta/info.json`` on the Hub."""
from huggingface_hub import hf_hub_download
path = hf_hub_download(
repo_id=repo_id,
filename="meta/info.json",
repo_type="dataset",
revision=revision,
)
with open(path) as f:
info = json.load(f)
total = int(info["total_episodes"])
if total <= 0:
raise ValueError(f"info.json reports non-positive total_episodes={total!r}")
return total
def build_include_list(total_episodes: int, exclude: set[int]) -> list[int]:
"""Return ``[0, total_episodes)`` with ``exclude`` removed, ascending."""
out_of_range = sorted(e for e in exclude if e < 0 or e >= total_episodes)
if out_of_range:
raise ValueError(
f"{len(out_of_range)} excluded index(es) outside [0, {total_episodes}): "
f"{out_of_range[:10]}{'...' if len(out_of_range) > 10 else ''}. "
"The dataset may have changed — re-run the subtask scan before training."
)
return [e for e in range(total_episodes) if e not in exclude]
def main() -> int:
p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
p.add_argument("--repo-id", default="pepijn223/robocasa_pretrain_human300_v4")
p.add_argument("--revision", default=None, help="Dataset revision/branch (default: main).")
p.add_argument(
"--exclude-file",
default=None,
help="Optional JSON file with a list of episode indices to exclude. "
"Overrides the built-in default set.",
)
p.add_argument(
"--total-episodes",
type=int,
default=None,
help="Total episode count. If omitted, read from meta/info.json on the Hub.",
)
p.add_argument(
"--no-validate-hub",
action="store_true",
help="Do not fetch info.json from the Hub; requires --total-episodes.",
)
p.add_argument(
"--out",
default=None,
help="Write the list to this file instead of stdout.",
)
args = p.parse_args()
if args.exclude_file:
with open(args.exclude_file) as f:
data = json.load(f)
# Accept either a bare list or the {"missing_episode_indices": [...]} report shape.
exclude = set(data["missing_episode_indices"] if isinstance(data, dict) else data)
else:
exclude = set(DEFAULT_EXCLUDE)
if args.total_episodes is not None:
total = args.total_episodes
if not args.no_validate_hub:
hub_total = _total_episodes_from_hub(args.repo_id, args.revision)
if hub_total != total:
raise ValueError(
f"--total-episodes={total} disagrees with Hub info.json total_episodes={hub_total}."
)
else:
if args.no_validate_hub:
raise SystemExit("--no-validate-hub requires --total-episodes.")
total = _total_episodes_from_hub(args.repo_id, args.revision)
include = build_include_list(total, exclude)
_log(
f"[build_episode_filter] repo={args.repo_id} total={total} "
f"excluded={len(exclude)} kept={len(include)}"
)
# Compact JSON (no spaces) so the resulting CLI arg stays as short as possible.
payload = "[" + ",".join(map(str, include)) + "]"
if args.out:
with open(args.out, "w") as f:
f.write(payload)
_log(f"[build_episode_filter] wrote {len(payload)} bytes to {args.out}")
else:
sys.stdout.write(payload)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,115 @@
#!/bin/bash
#SBATCH --job-name=pi052-hirobot-robocasa-human300
#SBATCH --partition=hopper-prod
#SBATCH --qos=high
#SBATCH --time=48:00:00
#SBATCH --ntasks=1
#SBATCH --gpus-per-task=8
set -euo pipefail
cd "${LEROBOT_ROOT:-$HOME/lerobot}"
export LEROBOT_DEBUG_PREDS_EVERY=1000
export PATH="$HOME/miniconda3/bin:$HOME/.local/bin:$PATH"
export LD_LIBRARY_PATH="$HOME/miniconda3/lib:${LD_LIBRARY_PATH:-}"
export NCCL_TIMEOUT="${NCCL_TIMEOUT:-1800}"
export HF_HUB_DOWNLOAD_TIMEOUT="${HF_HUB_DOWNLOAD_TIMEOUT:-120}"
export WANDB_INIT_TIMEOUT="${WANDB_INIT_TIMEOUT:-300}"
export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-expandable_segments:True}"
# Compile path: pin triton + inductor caches node-local. The shared
# /fsx cache mixes kernels built against different glibc versions and
# trips ``GLIBC_2.34 not found`` on hopper nodes (bench v3 confirmed).
export TRITON_CACHE_DIR="/tmp/triton_${SLURM_JOB_ID}"
export TORCHINDUCTOR_CACHE_DIR="/tmp/torchinductor_${SLURM_JOB_ID}"
mkdir -p "$TRITON_CACHE_DIR" "$TORCHINDUCTOR_CACHE_DIR"
# Non-fatal so an unstaged local hotfix doesn't kill the job. CI / clean
# checkouts still fast-forward as before; dirty trees just keep their
# in-flight changes (the working tree is what runs).
git pull --ff-only || echo "[warn] git pull skipped — keeping working tree."
python -m pip install -q --upgrade -e .
python -m pip install -q --upgrade -e '.[pi]'
python -m pip install -q --upgrade 'liger-kernel'
# FlashAttention-2 is NOT installed. The pi052 dual-expert layer compute
# uses SDPA (the block-bidirectional mask is unsupported by FA2 anyway),
# and the only other consumer would be liger-kernel — which gracefully
# degrades when flash_attn is absent. The previously-installed wheel was
# built against a newer GLIBC than some hopper compute nodes provide
# (job 22162586 on ip-26-0-162-14 hit ``GLIBC_2.32 not found``), so the
# safest configuration is "not installed". To re-enable for the
# downstream HF Gemma ``generate`` path, install a wheel matching the
# node's libc — but verify on every assigned node first.
DATASET="pepijn223/robocasa_pretrain_human300_v4"
DATASET_REVISION="${DATASET_REVISION:-main}"
POLICY_REPO_ID="pepijn223/pi052_robocasa_human300"
JOB_NAME="pi052-hirobot-robocasa-human300"
NUM_PROCESSES=8
# BS=36 — fits ~72 GB / 80 GB, BS=36 × 8 GPUs = 288 effective.
BATCH_SIZE=${BATCH_SIZE:-36}
STEPS=${STEPS:-5000}
RUN_ID="${SLURM_JOB_ID:-$(date +%Y%m%d_%H%M%S)}"
OUTPUT_DIR="/fsx/pepijn/outputs/train/pi052_robocasa_human300_${RUN_ID}"
# --- Exclude un-annotated episodes -----------------------------------------
# 63 episodes in this dataset carry NO `subtask` annotation (no persistent
# language rows at all). `--dataset.episodes` is an INCLUDE list, so we pass
# the complement: every episode index except those 63. The helper reads
# meta/info.json from the Hub to confirm total_episodes (32043) and validates
# the excluded indices are in range before emitting the list. If the dataset
# version changes such that the indices fall out of range, the helper aborts
# the job rather than silently training on the wrong episodes.
echo "Building episode include-list (excluding un-annotated episodes)..."
EPISODES=$(python scripts/build_episode_filter.py \
--repo-id "$DATASET" \
--revision "$DATASET_REVISION")
echo "Training pi052 on $DATASET with ${NUM_PROCESSES} GPUs, batch size ${BATCH_SIZE}/GPU, ${STEPS} steps"
echo "Output directory: $OUTPUT_DIR"
export LEROBOT_DUMP_RECIPE_SAMPLES=8
accelerate launch --multi_gpu --num_processes="$NUM_PROCESSES" \
-m lerobot.scripts.lerobot_train \
--policy.type=pi052 \
--policy.pretrained_path=lerobot/pi05_base \
--policy.recipe_path=recipes/subtask_mem_vqa_robocasa.yaml \
--dataset.repo_id="$DATASET" \
--dataset.revision="$DATASET_REVISION" \
--dataset.episodes="$EPISODES" \
--dataset.video_backend=pyav \
--output_dir="$OUTPUT_DIR" \
--job_name="$JOB_NAME" \
--policy.repo_id="$POLICY_REPO_ID" \
--policy.compile_model=true \
--policy.compile_mode=default \
--policy.gradient_checkpointing=true \
--policy.device=cuda \
--policy.tokenizer_max_length=256 \
--policy.action_tokenizer_name=lerobot/fast-action-tokenizer \
--policy.chunk_size=30 \
--policy.n_action_steps=30 \
--policy.max_action_tokens=256 \
--steps="$STEPS" \
--policy.scheduler_decay_steps="$STEPS" \
--batch_size="$BATCH_SIZE" \
--wandb.enable=true \
--policy.dtype=bfloat16 \
--policy.optimizer_lr=5e-5 \
--policy.optimizer_grad_clip_norm=1.0 \
--policy.scheduler_decay_lr=5e-6 \
--policy.lm_head_lr_scale=5.0 \
--ema.enable=true \
--wandb.disable_artifact=true \
--wandb.project=hirobot \
--log_freq=100 \
--save_freq=5000 \
--num_workers=4 \
--prefetch_factor=4 \
--persistent_workers=true \
--dataset.image_transforms.enable=true \
--dataset.image_transforms.max_num_transforms=3 \
--dataset.image_transforms.random_order=true \
--policy.auto_fit_fast_tokenizer=true \
--policy.knowledge_insulation=true

View File

@@ -92,6 +92,136 @@ class PlanConfig:
use_video_url: bool = False
use_video_url_fps: float = 1.0
# Structured per-subtask action records (Phase 1a + 1b, inspired by
# EgoMimic's annotator form). For each generated subtask span, the
# VLM extracts a typed record (verb / object / arm / grasp_type /
# destination / mistake). A deterministic Python template renders
# that record back to canonical subtask text — reducing the VLM's
# "creative" surface to just the perception step. See
# ``ActionRecordsConfig`` for details. Off by default (back-compat).
action_records: "ActionRecordsConfig" = field(default_factory=lambda: ActionRecordsConfig())
# Structured 5-axis augmentation taxonomy for the t=0 task variants
# (replaces the free-form ``n_task_rephrasings`` flow when enabled).
# Mirrors EgoMimic's ``augment_prompt.txt`` taxonomy: instead of N
# free-form rephrasings, the VLM produces variants along named
# axes (synonym / omit_arm / omit_orientation / omit_grasp_method /
# combined). Off by default (back-compat).
task_aug_axes: "TaskAugAxesConfig" = field(default_factory=lambda: TaskAugAxesConfig())
@dataclass
class ActionRecordsConfig:
"""Structured per-subtask action record extraction.
When ``enabled=True``, after the existing subtask-span generation in
``plan_subtasks_memory.py``, the module makes one extra VLM call per
subtask to extract a typed record::
{
"verb": "pick" | "place" | "press" | ..., # closed vocabulary
"object": "<canonical_object_name>",
"arm": "left" | "right" | "both" | null,
"grasp_type": "pinch" | "wrap" | "hook" | ... | null,
"destination": "<canonical_destination>" | null,
"mistake": "<short text>" | null,
}
A deterministic Python template then renders the record back to
canonical subtask text (e.g. ``pick blue cube with left arm using
pinch grip``). When ``replace_subtask_text=True`` (default), the
rendered text REPLACES the VLM's free-form subtask text — eliminating
cross-episode phrasing drift. When ``emit_record_row=True``
(default), the structured record is also emitted as a row with
``style="action_record"`` so downstream consumers can train on the
typed schema directly.
Cost: one extra VLM call per subtask. For an 8-subtask episode this
means ~8x more VLM calls in the plan module — still cheap relative
to the action-expert training cost, but worth knowing.
"""
enabled: bool = False
# When True, replace the VLM-generated subtask text with the
# deterministic template's rendering of the structured record.
# Strongly recommended — it's the whole point of the structured
# intermediate. Set False to keep both representations side by side.
replace_subtask_text: bool = True
# When True, emit a separate row with ``style="action_record"`` and
# ``content=json.dumps(record)`` at the subtask's start timestamp.
# Lets downstream training consume the typed schema directly (e.g.
# auxiliary supervision on verb/arm/grasp classification heads).
emit_record_row: bool = True
# Frame sampling for the per-subtask VLM call (similar to the
# interjection module's window). Anchored to the subtask span.
frames_per_subtask: int = 4
# Closed verb vocabulary. The prompt instructs the VLM to pick
# exactly one. Override per-dataset (e.g. ``["pick", "place", "open",
# "close"]`` for door-only manipulation) for tighter constraint.
verb_vocabulary: tuple[str, ...] = (
"pick", "place", "push", "pull", "open", "close", "turn",
"press", "lift", "insert", "pour", "move", "reach", "grasp",
"release", "wipe", "dump",
)
# Closed grasp-type vocabulary. ``null`` is always allowed (no
# contact / unclear). Adjust per-hardware (e.g. drop ``hook`` /
# ``key`` for parallel-jaw grippers).
grasp_vocabulary: tuple[str, ...] = (
"pinch", "wrap", "hook", "key", "lateral",
)
@dataclass
class TaskAugAxesConfig:
"""Structured 5-axis augmentation taxonomy for t=0 task variants.
When ``enabled=True``, replaces the free-form ``n_task_rephrasings``
flow with a structured prompt that produces variants along five
named axes (mirroring EgoMimic's ``augment_prompt.txt``):
* ``synonym_paraphrase`` — different wording / verbs, all
information preserved.
* ``omit_arm`` — drop the left/right/both arm specification.
* ``omit_orientation`` — drop orientation cues (upright,
sideways, ...).
* ``omit_grasp_method`` — drop grip / grasp method specification.
* ``combined_omissions`` — combine two of the above
simultaneously.
Default counts (3+3+2+2+2 = 12 variants per task) match EgoMimic.
Axes that have nothing to omit in the source task (e.g. ``omit_arm``
when the task doesn't mention an arm) emit fewer entries rather
than pad — the prompt instructs the VLM accordingly.
Each variant is emitted as a ``task_aug`` row at ``t=0`` (same
style as the free-form variants), so the rest of the pipeline /
training recipe doesn't need to know about the taxonomy.
"""
enabled: bool = False
synonym_paraphrase: int = 3
omit_arm: int = 3
omit_orientation: int = 2
omit_grasp_method: int = 2
combined_omissions: int = 2
@property
def total(self) -> int:
"""Sum of requested variants across all axes (upper bound)."""
return (
self.synonym_paraphrase
+ self.omit_arm
+ self.omit_orientation
+ self.omit_grasp_method
+ self.combined_omissions
)
@dataclass
class InterjectionsConfig:

View File

@@ -17,6 +17,7 @@
from __future__ import annotations
import json
import logging
from collections.abc import Sequence
from dataclasses import dataclass, field
@@ -28,6 +29,7 @@ from ..frames import (
FrameProvider,
VideoFrameProvider,
null_provider,
to_image_blocks,
to_video_block,
to_video_url_block,
)
@@ -78,13 +80,37 @@ class PlanSubtasksMemoryModule:
# ``task_aug`` rows at t=0 (role=user), one per rephrasing — the
# message renderer rotates ``${task}`` deterministically through
# them so the policy sees diverse phrasings during training.
# Two paths:
# * ``task_aug_axes.enabled=True`` — structured 5-axis taxonomy
# (synonym / omit_arm / omit_orientation / omit_grasp_method
# / combined). Replaces the free-form rephrasings flow.
# * Otherwise — free-form ``n_task_rephrasings`` (original).
t0 = float(record.frame_timestamps[0]) if record.frame_timestamps else 0.0
if self.config.n_task_rephrasings > 0 and effective_task:
axes_cfg = self.config.task_aug_axes
if axes_cfg.enabled and effective_task:
variants = self._generate_task_aug_by_axes(effective_task, axes_cfg)
seen: set[str] = set()
ordered = [effective_task, *variants]
for phrasing in ordered:
key = phrasing.strip()
if not key or key in seen:
continue
seen.add(key)
rows.append(
{
"role": "user",
"content": key,
"style": "task_aug",
"timestamp": t0,
"tool_calls": None,
}
)
elif self.config.n_task_rephrasings > 0 and effective_task:
rephrasings = self._generate_task_rephrasings(effective_task, n=self.config.n_task_rephrasings)
# Always include the effective task itself as the first variant
# so the rotation is guaranteed to cover the source-of-truth
# phrasing, not just synthetic alternatives.
seen: set[str] = set()
seen = set()
ordered = [effective_task, *rephrasings]
for phrasing in ordered:
key = phrasing.strip()
@@ -102,8 +128,31 @@ class PlanSubtasksMemoryModule:
)
subtask_spans = self._generate_subtasks(record, task=effective_task)
# subtask rows
for span in subtask_spans:
# ----------------------------------------------------------------
# Phase 1a + 1b: structured per-subtask action records
# ----------------------------------------------------------------
# When enabled, for every subtask span we ask the VLM for a typed
# ActionRecord (verb / object / arm / grasp_type / destination /
# mistake). A deterministic Python template renders the record
# back to canonical subtask text. The render replaces the
# free-form subtask text (cleaner conditioning) and the typed
# record is emitted as a separate row for downstream use.
records_cfg = self.config.action_records
action_records: list[dict[str, Any] | None] = [None] * len(subtask_spans)
if records_cfg.enabled and subtask_spans:
for i, span in enumerate(subtask_spans):
rec = self._extract_action_record(record, span, effective_task)
if rec is None:
continue
action_records[i] = rec
if records_cfg.replace_subtask_text:
canonical_text = self._render_action_record_to_subtask_text(rec)
if canonical_text:
span["text"] = canonical_text
# subtask rows (may now reflect canonical-rendered text)
for i, span in enumerate(subtask_spans):
rows.append(
{
"role": "assistant",
@@ -113,6 +162,16 @@ class PlanSubtasksMemoryModule:
"tool_calls": None,
}
)
if records_cfg.enabled and records_cfg.emit_record_row and action_records[i] is not None:
rows.append(
{
"role": "assistant",
"content": json.dumps(action_records[i], sort_keys=True),
"style": "action_record",
"timestamp": snap_to_frame(span["start"], record.frame_timestamps),
"tool_calls": None,
}
)
# Plan rows at every subtask boundary — including t=0 (start of
# the first subtask). Because the plan is just a numbered list
# of *still-todo* subtasks, re-emitting at each boundary makes
@@ -244,6 +303,202 @@ class PlanSubtasksMemoryModule:
out = [item.strip().strip('"').strip("'") for item in raw if isinstance(item, str)]
return [s for s in out if s][:n]
# ------------------------------------------------------------------
# Phase 1a + 1b: structured per-subtask action records
# ------------------------------------------------------------------
def _extract_action_record(
self,
record: EpisodeRecord,
span: dict[str, Any],
episode_task: str,
) -> dict[str, Any] | None:
"""Ask the VLM to extract a typed ``ActionRecord`` from a subtask span.
Sends ``frames_per_subtask`` frames uniformly sampled from
``[span.start, span.end]`` plus the canonical subtask text. The
VLM is constrained to verb + grasp vocabularies from the config
— invalid values are silently dropped at this layer (the
validator catches structural problems pre-write).
Returns ``None`` when the call fails or the VLM returns something
unrecognizable; callers fall back to the free-form subtask text.
"""
cfg = self.config.action_records
start_t = float(span.get("start", 0.0))
end_t = float(span.get("end", start_t))
duration = max(0.0, end_t - start_t)
# Uniform timestamps within the span; fall back to a single
# center frame for very short spans.
n = max(1, int(cfg.frames_per_subtask))
if n == 1 or duration <= 0.0:
timestamps = [0.5 * (start_t + end_t)]
else:
step = duration / (n - 1)
timestamps = [start_t + i * step for i in range(n)]
frames = self.frame_provider.frames_at(record, timestamps)
if not frames:
logger.debug(
"action_record: no frames at span %.2f-%.2f for ep %s; skipping",
start_t, end_t, record.episode_index,
)
return None
prompt = load_prompt("module_1_action_record").format(
episode_task=episode_task,
subtask_text=span.get("text", ""),
start_time=start_t,
end_time=end_t,
duration=duration,
n_frames=len(frames),
verb_vocabulary=", ".join(cfg.verb_vocabulary),
grasp_vocabulary=" | ".join(f'"{g}"' for g in cfg.grasp_vocabulary),
)
message = [
{
"role": "user",
"content": [*to_image_blocks(frames), {"type": "text", "text": prompt}],
}
]
result = self.vlm.generate_json([message])[0]
if not isinstance(result, dict):
return None
# Light validation + normalisation. Verb is required; everything
# else may be null. Verb / grasp_type are clamped to the
# vocabularies (out-of-vocab → reject or null).
verb = (result.get("verb") or "").strip().lower()
if not verb or verb not in {v.lower() for v in cfg.verb_vocabulary}:
return None
obj = (result.get("object") or "").strip()
if not obj:
return None
grasp = result.get("grasp_type")
if isinstance(grasp, str):
grasp = grasp.strip().lower()
if grasp not in {g.lower() for g in cfg.grasp_vocabulary}:
grasp = None
else:
grasp = None
arm = result.get("arm")
if isinstance(arm, str):
arm = arm.strip().lower()
if arm not in {"left", "right", "both"}:
arm = None
else:
arm = None
destination = result.get("destination")
destination = destination.strip() if isinstance(destination, str) and destination.strip() else None
mistake = result.get("mistake")
mistake = mistake.strip() if isinstance(mistake, str) and mistake.strip() else None
return {
"verb": verb,
"object": obj,
"arm": arm,
"grasp_type": grasp,
"destination": destination,
"mistake": mistake,
}
@staticmethod
def _render_action_record_to_subtask_text(record: dict[str, Any]) -> str:
"""Deterministic template: ``ActionRecord`` → canonical subtask text.
Mirrors the authoring guidance in ``module_1_subtasks.txt``:
imperative, drop articles / adverbs, use canonical object nouns,
append arm / grasp clauses only when present.
Examples (record → rendered text)::
{verb=pick, object=blue cube}
"pick blue cube"
{verb=pick, object=blue cube, arm=left, grasp_type=pinch}
"pick blue cube with left arm using pinch grip"
{verb=place, object=blue cube, destination=green box}
"place blue cube in green box"
{verb=move, object=mug, destination=stove}
"move mug to stove"
"""
verb = (record.get("verb") or "").strip().lower()
obj = (record.get("object") or "").strip()
arm = (record.get("arm") or "").strip().lower() if record.get("arm") else ""
grasp = (record.get("grasp_type") or "").strip().lower() if record.get("grasp_type") else ""
dest = (record.get("destination") or "").strip() if record.get("destination") else ""
if not verb:
return ""
parts: list[str] = [verb]
if obj:
parts.append(obj)
if dest:
# Pick a sensible preposition per verb family.
if verb in {"place", "put", "drop", "insert", "pour", "dump"}:
parts.append(f"in {dest}")
elif verb in {"move", "transport", "reach"}:
parts.append(f"to {dest}")
else:
parts.append(f"at {dest}")
if arm == "both":
parts.append("with both arms")
elif arm in {"left", "right"}:
parts.append(f"with {arm} arm")
if grasp:
parts.append(f"using {grasp} grip")
return " ".join(parts)
# ------------------------------------------------------------------
# Structured 5-axis task augmentation (EgoMimic-style taxonomy)
# ------------------------------------------------------------------
def _generate_task_aug_by_axes(self, base_task: str, axes_cfg: Any) -> list[str]:
"""One VLM call → variants along the 5-axis taxonomy.
Variants from all axes are flattened into a single list (the
downstream pipeline doesn't need to know about the per-axis
bucketing — every variant becomes a ``task_aug`` row). Order
is preserved for reproducibility: synonym_paraphrase first,
then omit_arm, then omit_orientation, then omit_grasp_method,
then combined_omissions.
"""
if not base_task:
return []
prompt = load_prompt("module_1_task_aug_axes").format(
base_task=base_task,
n_synonym=axes_cfg.synonym_paraphrase,
n_omit_arm=axes_cfg.omit_arm,
n_omit_orientation=axes_cfg.omit_orientation,
n_omit_grasp_method=axes_cfg.omit_grasp_method,
n_combined=axes_cfg.combined_omissions,
)
result = self.vlm.generate_json([self._text_message(prompt)])[0]
if not isinstance(result, dict):
return []
ordered_axes = (
"synonym_paraphrase",
"omit_arm",
"omit_orientation",
"omit_grasp_method",
"combined_omissions",
)
flat: list[str] = []
seen: set[str] = set()
for axis in ordered_axes:
entries = result.get(axis)
if not isinstance(entries, list):
continue
for item in entries:
if not isinstance(item, str):
continue
key = item.strip().strip('"').strip("'")
if not key or key in seen:
continue
seen.add(key)
flat.append(key)
return flat
def _episode_video_block(self, record: EpisodeRecord) -> list[dict[str, Any]]:
"""Same video block ``_generate_subtasks`` builds — extracted helper."""
if not record.frame_timestamps:

View File

@@ -0,0 +1,64 @@
You are extracting a structured action record from a subtask span of a
teleoperated robot demonstration. This is Phase 1a of a two-step
process: you extract a typed record; a deterministic template then
renders it back to canonical subtask text. Your job is the PERCEPTION
step — not the language step.
The user originally asked: "{episode_task}"
The subtask span is: "{subtask_text}"
Span time window: [{start_time:.2f}s, {end_time:.2f}s]
({duration:.2f}s of robot activity)
You are shown {n_frames} frames sampled uniformly from the subtask
window. Fill in a structured record describing the action that takes
place between the first and last frame.
Hard rules:
- Use ONLY information visible in the frames. Do not infer details from
outside the span. Do not extrapolate from the original task wording.
- Use canonical object names from the original task VERBATIM. Never
introduce synonyms: if the task says "cube", the record says "cube",
never "block" / "object" / "item".
- For non-applicable fields, use ``null`` (not "n/a", not "none", not
an empty string).
- For ``verb`` and ``grasp_type``, pick EXACTLY one value from the
vocabulary below. Never invent a new one.
Field schema:
verb (required) — the imperative verb of the action. Vocabulary:
{verb_vocabulary}
object (required) — the manipulated object. Use the canonical noun
from the original task above.
arm — which arm performs the action. One of:
"left" | "right" | "both" | null
Use ``null`` when the source robot is single-arm or when the arm
is genuinely not visible in the frames.
grasp_type — which grip the gripper uses on contact. One of:
{grasp_vocabulary} | null
Use ``null`` when there is no contact in this span (e.g. a pure
``move`` / ``reach`` subtask) or the grip is genuinely unclear.
destination — the target location for actions like ``place``,
``move``, ``insert``, ``pour``. Use canonical names from the
original task. Use ``null`` for in-place actions (``press``,
``turn``, ``grasp``, ``release``).
mistake — a brief one-clause description of any visible failure or
recovery during the span (e.g. "dropped the cube and re-grasped",
"missed the target on first attempt"). Use ``null`` when the span
completes cleanly with no visible recovery.
Output strictly valid JSON of shape:
{{
"verb": "<one of vocabulary>",
"object": "<canonical noun>",
"arm": "left" | "right" | "both" | null,
"grasp_type": "<one of vocabulary>" | null,
"destination": "<canonical noun>" | null,
"mistake": "<short description>" | null
}}

View File

@@ -0,0 +1,60 @@
You are generating structured augmentations of a robot task instruction
for training a language-conditioned policy. Unlike free-form rephrasing,
your variants follow a NAMED 5-axis taxonomy — each axis omits or varies
a specific element of the task while preserving its meaning.
Original task: "{base_task}"
Produce variants along five named axes. Each axis has a target count.
The whole batch should expose the policy to maximum linguistic diversity
WITHOUT changing what the robot is supposed to do.
Axes and target counts:
synonym_paraphrase ({n_synonym}):
Different wording / verbs / sentence structure. ALL information
from the original task is preserved — same object, same arm
specification if present, same orientation if present, same grasp
if present.
omit_arm ({n_omit_arm}):
Drop the left/right/both arm specification from the task. Skip
entirely (emit 0 entries) if the original task does NOT mention an
arm. Do not invent an arm specification just to omit it.
omit_orientation ({n_omit_orientation}):
Drop orientation cues (upright, sideways, facing the user,
long-edge-first, etc.). Skip entirely if no orientation cue is
present in the original task.
omit_grasp_method ({n_omit_grasp_method}):
Drop the grip / grasp method specification (pinch, wrap, hold by
the rim, etc.). Skip entirely if no grasp method is mentioned.
combined_omissions ({n_combined}):
Combine TWO of the above omissions simultaneously (e.g. drop both
arm and orientation). Skip entirely if fewer than two of (arm,
orientation, grasp_method) appear in the original task.
Hard rules:
- Each variant MUST preserve the core action and the target object.
Do not change which object is involved, the destination, or the
high-level action.
- Each variant is plain prose, no markdown, no quotes, no list numbers.
- Each variant must be DISTINCT from every other variant in the entire
output, both within and across axes. Near-duplicates are not allowed.
- If an axis cannot reach its target count because the original task
lacks the omittable element, emit fewer entries — do NOT pad the
axis with paraphrases that belong to a different axis.
- Variants should not all start with verbs — vary sentence structure
(some imperative, some polite request, some question).
Output strictly valid JSON of shape:
{{
"synonym_paraphrase": ["<v1>", "<v2>", ...],
"omit_arm": ["<v1>", "<v2>", ...],
"omit_orientation": ["<v1>", ...],
"omit_grasp_method": ["<v1>", ...],
"combined_omissions": ["<v1>", ...]
}}

View File

@@ -46,7 +46,7 @@ CORE_STYLES = {
EXTENDED_STYLES: set[str] = set()
STYLE_REGISTRY = CORE_STYLES | EXTENDED_STYLES
PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug"}
PERSISTENT_STYLES = {"subtask", "plan", "memory", "motion", "task_aug", "action_record"}
EVENT_ONLY_STYLES = {"interjection", "vqa", "trace"}
# Styles whose ``content`` is grounded in a specific camera view. Rows of these