annotate: multi-call subtask quality chain (describe -> segment -> verify)

The single-call 'watch video -> emit subtask JSON' pattern makes the
VLM commit to structured output before reasoning about what it saw, so
it pattern-matches the task text and hallucinates steps. Split it into
an opt-in multi-call chain that grounds first and prunes last.

New PlanConfig flags (both default False -> single-call unchanged):
  * subtask_describe_first: a grounding pass narrates ONLY what is
    visible in the video (no subtask JSON yet). That description is
    injected into the segmentation prompt via a new {observation_block}
    placeholder, so the model segments its own grounded observations
    instead of the instruction text. +1 VLM call/episode.
  * subtask_verify: after segmentation, an adversarial pass re-watches
    the video and drops any candidate subtask it cannot see. Can only
    PRUNE (never add/rewrite/move) and fails open (keeps un-verified
    spans if the call returns nothing). +1 VLM call/episode.

Implementation:
  * _generate_subtasks now orchestrates describe -> segment -> verify.
  * Factored span cleaning into _clean_spans (shared by segment + verify
    outputs); added _describe_episode and _verify_subtasks helpers.
  * New prompts module_1_subtask_describe.txt (returns {description})
    and module_1_subtask_verify.txt (returns pruned {subtasks}).
  * module_1_subtasks.txt gains a {observation_block} slot at the top.

run_hf_job.py enables both for the RoboCasa run (3 VLM calls/episode
for subtasks). Combined with single-camera grounding + the embedded-
frame path, this is the high-quality configuration.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-06-02 15:12:46 +02:00
parent ba5d4c5cd8
commit dcd368e1f8
6 changed files with 183 additions and 7 deletions

View File

@@ -51,6 +51,22 @@ class PlanConfig:
min_subtask_seconds: float = 1.5
plan_max_steps: int = 8
# Multi-call subtask quality chain (opt-in, more VLM calls, higher
# quality). Both off by default → single-call behaviour unchanged.
#
# ``subtask_describe_first``: run a grounding pass that narrates ONLY
# what is visible in the video (no subtask JSON yet), then inject that
# description into the segmentation prompt. Forces the model to
# observe before committing to structured output — the strongest
# lever against subtasks invented from the task text. +1 VLM call/ep.
subtask_describe_first: bool = False
# ``subtask_verify``: after segmentation, re-watch the video and drop
# any proposed subtask that can't be verified as visible. Prunes
# hallucinations; can only remove subtasks, never add/rewrite them.
# Fail-open (keeps un-verified spans if the verify call returns
# nothing). +1 VLM call/ep.
subtask_verify: bool = False
# When True (and backend supports it, e.g. ``openai``), the ``plan``
# module sends a ``video_url`` block pointing at a per-episode mp4
# subclip and lets the server sample frames at ``use_video_url_fps``.

View File

@@ -521,20 +521,65 @@ class PlanSubtasksMemoryModule:
staging.write("plan", new_rows)
def _generate_subtasks(self, record: EpisodeRecord, *, task: str | None = None) -> list[dict[str, Any]]:
"""Generate subtask spans, optionally via a multi-call quality chain.
Single call (default): watch video → emit subtask JSON.
Multi-call (opt-in, higher quality, more VLM calls):
1. ``subtask_describe_first`` — a grounding pass that narrates
ONLY what is visible (no JSON commitment to subtasks yet);
its description is injected into the segmentation prompt so
the model segments its own grounded observations instead of
pattern-matching the task text.
2. segmentation — emit subtask JSON (as before).
3. ``subtask_verify`` — an adversarial pass that re-watches the
video and drops any proposed subtask it cannot actually see,
pruning hallucinations.
"""
if record.row_count == 0 or not record.frame_timestamps:
return []
episode_duration = record.frame_timestamps[-1] - record.frame_timestamps[0]
effective_task = task if task is not None else record.episode_task
# ---- Pass 1 (optional): grounding description ----------------
observation_block = ""
if getattr(self.config, "subtask_describe_first", False):
description = self._describe_episode(record, effective_task)
if description:
observation_block = (
"You watched this video and described, chronologically, "
"ONLY what the robot actually does:\n"
f'"""{description}"""\n\n'
"Segment THAT grounded description (cross-checked against "
"the video) into atomic subtasks. Do not introduce any "
"action that is not in your description above.\n\n"
)
# ---- Pass 2: segmentation ------------------------------------
prompt = load_prompt("module_1_subtasks").format(
episode_task=(task if task is not None else record.episode_task),
episode_task=effective_task,
min_subtask_seconds=self.config.min_subtask_seconds,
max_steps=self.config.plan_max_steps,
episode_duration=f"{episode_duration:.3f}",
observation_block=observation_block,
)
messages = self._video_message(record, prompt)
spans = self._vlm_field(messages, "subtasks")
spans = self._vlm_field(self._video_message(record, prompt), "subtasks")
cleaned = self._clean_spans(spans, record)
if not cleaned:
return []
# ---- Pass 3 (optional): verification / pruning ---------------
if getattr(self.config, "subtask_verify", False):
cleaned = self._verify_subtasks(record, effective_task, cleaned)
return cleaned
def _clean_spans(
self, spans: Any, record: EpisodeRecord
) -> list[dict[str, Any]]:
"""Clamp / sort / dedupe raw VLM subtask spans into valid rows."""
if not spans:
return []
# clamp to [t0, t_last] and sort
t0 = record.frame_timestamps[0]
t_last = record.frame_timestamps[-1]
cleaned: list[dict[str, Any]] = []
@@ -553,8 +598,56 @@ class PlanSubtasksMemoryModule:
continue
cleaned.append({"text": text, "start": start, "end": end})
cleaned.sort(key=lambda s: s["start"])
cleaned = self._dedupe_starts_to_distinct_frames(cleaned, record)
return cleaned
return self._dedupe_starts_to_distinct_frames(cleaned, record)
def _describe_episode(self, record: EpisodeRecord, task: str) -> str:
"""Grounding pass: free-form chronological description of the video."""
prompt = load_prompt("module_1_subtask_describe").format(episode_task=task)
text = self._vlm_field(self._video_message(record, prompt), "description")
return text.strip() if isinstance(text, str) and text.strip() else ""
def _verify_subtasks(
self,
record: EpisodeRecord,
task: str,
spans: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Adversarial pass: drop proposed subtasks not visible in the video.
Keeps the original span on a verified ``text`` match (the verify
prompt is told not to rewrite text), so verification can only
PRUNE — never invent or mutate. If the verify call fails or
returns nothing parseable, the un-verified spans are kept (fail
open: better to keep a possibly-good label than silently drop
everything on a transient VLM hiccup).
"""
import json # noqa: PLC0415
subtasks_json = json.dumps(
{"subtasks": [{"text": s["text"], "start": round(s["start"], 3), "end": round(s["end"], 3)} for s in spans]},
indent=2,
)
prompt = load_prompt("module_1_subtask_verify").format(
episode_task=task, subtasks_json=subtasks_json
)
kept_raw = self._vlm_field(self._video_message(record, prompt), "subtasks")
kept = self._clean_spans(kept_raw, record)
if not kept:
logger.info(
"episode %d: verify pass returned nothing — keeping the %d "
"un-verified subtask(s) (fail-open)",
record.episode_index,
len(spans),
)
return spans
if len(kept) < len(spans):
logger.info(
"episode %d: verify pass pruned %d -> %d subtask(s)",
record.episode_index,
len(spans),
len(kept),
)
return kept
@staticmethod
def _dedupe_starts_to_distinct_frames(

View File

@@ -0,0 +1,27 @@
You are watching a teleoperated robot demonstration from a single
camera. The user asked the robot to: "{episode_task}"
This is an OBSERVATION pass. Watch the entire clip and describe, in
chronological order, ONLY what the robot physically does — the concrete
motions, approaches, contacts, grasps, releases, and relocations you can
actually SEE in the frames.
Hard rules:
- Describe only motion visible in the video. Do NOT use the task
instruction to guess steps that aren't shown. The instruction is the
goal; the video is ground truth.
- Do NOT segment into named subtasks yet and do NOT output JSON beyond
the single field below. Just narrate what happens.
- Give an approximate timestamp (in seconds) for each distinct event,
e.g. "0.0-1.4s: the base drives forward toward the stove".
- Do NOT invent objects, grasps, destinations, or steps. If the robot
only does one thing (e.g. it just navigates and the clip ends), say
exactly that and nothing more.
- Be concrete and literal. "the gripper closes on the mug" — not "the
robot prepares to make coffee".
Output strictly valid JSON:
{{
"description": "<chronological, timestamped description of ONLY what is visible>"
}}

View File

@@ -0,0 +1,33 @@
You previously segmented a teleoperated robot demonstration into these
candidate subtasks (JSON):
{subtasks_json}
The user's task was: "{episode_task}"
This is a VERIFICATION pass. Re-watch the video. For EACH candidate
subtask, decide whether the robot can ACTUALLY be seen performing that
action within its [start, end] time window.
Rules:
- KEEP a subtask only if its action is clearly visible in the video in
roughly that time window.
- DROP any subtask whose action you cannot see, that describes
something not actually present in the video, that was inferred from
the task instruction rather than observed, or that duplicates another
kept subtask.
- Do NOT add new subtasks. Do NOT rewrite the text of kept subtasks.
Do NOT change the start/end timestamps of kept subtasks.
- It is correct and expected to return FEWER subtasks than you were
given — even just one — if that is all the video supports. Returning
zero is allowed if none can be verified.
Output strictly valid JSON of the SAME shape, containing only the kept
subtasks in chronological order:
{{
"subtasks": [
{{"text": "<kept verbatim>", "start": <float>, "end": <float>}},
...
]
}}

View File

@@ -6,7 +6,7 @@ You are shown the entire demonstration as a single video. Watch the
whole clip, then segment it into a list of consecutive atomic subtasks
the robot performs.
GROUNDING — read this first, it overrides everything below:
{observation_block}GROUNDING — read this first, it overrides everything below:
- Label ONLY what the robot actually does in the video. Every subtask
you emit must correspond to motion you can SEE in specific frames.
- Do NOT invent, anticipate, or pad. If the robot only does one thing