refactor(annotate): plan = summary of still-todo subtasks, drop VLM call

The plan was being generated by a separate VLM call (one per
episode + one per interjection refresh) with a prompt that asked
the model to "compress the subtasks into a compact hierarchical
plan". In practice the plans came out longer than necessary and
sometimes drifted from the actual subtask sequence the runtime
would execute.

Replaced ``_generate_plan`` with a deterministic numbered list
of the upcoming subtasks. At a refresh time the list shrinks to
subtasks whose start ≥ refresh_t — the plan describes what's
*left* to do, so it gets shorter as work progresses.

Saves the per-episode + per-interjection VLM round-trip in the
annotation pipeline and keeps train-time plan text bit-aligned
with the subtask annotations the rest of Module 1 emits.

Removed the now-unused ``prompts/module_1_plan.txt``.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-05-13 15:55:02 +02:00
parent fa45ba631b
commit dd97c33814
2 changed files with 37 additions and 59 deletions

View File

@@ -372,54 +372,50 @@ class PlanSubtasksMemoryModule:
def _generate_plan(
self,
record: EpisodeRecord,
record: EpisodeRecord, # noqa: ARG002 (kept for signature stability)
subtask_spans: Sequence[dict[str, Any]],
*,
refresh_t: float | None = None,
interjection: str | None = None,
task: str | None = None,
interjection: str | None = None, # noqa: ARG002
task: str | None = None, # noqa: ARG002
) -> str | None:
"""Deterministic plan = numbered list of *still-todo* subtasks.
Previously this called the VLM with a prompt that asked it to
compress the subtasks into a "compact hierarchical plan". That
produced longer-than-necessary plans, cost an extra VLM round-trip
per episode (plus one per interjection on refresh), and could
diverge from the actual subtask sequence the model is going to
execute. Replacing it with a plain summarisation keeps the plan
tightly aligned with the upcoming subtasks and removes the VLM
call entirely.
Layout (matches the v2 plan style — short imperative fragments
prefixed by "N. "):
1. <subtask 1>
2. <subtask 2>
...
On a refresh at ``refresh_t`` (called from ``run_plan_updates``
on interjection events), only subtasks whose start is at or
after ``refresh_t`` are included — the plan shrinks as work
progresses, so it always describes what's left.
"""
if not subtask_spans:
return None
subtasks_text = "\n".join(f"- {s['text']}" for s in subtask_spans)
prompt = load_prompt("module_1_plan").format(
episode_task=(task if task is not None else record.episode_task),
subtasks_text=subtasks_text,
plan_max_steps=self.config.plan_max_steps,
remaining = [
s for s in subtask_spans
if refresh_t is None or float(s.get("start", 0.0)) >= float(refresh_t)
]
if not remaining:
# Past the last subtask boundary on a late refresh — nothing
# left to plan; emit None so the caller skips the row.
return None
return "\n".join(
f"{i}. {span.get('text', '').strip()}"
for i, span in enumerate(remaining, start=1)
)
if refresh_t is not None:
# ``current_subtask`` is the span the refresh time falls into,
# so the model knows where in the demonstration the planner is
# standing when it re-emits.
current_subtask = ""
for span in subtask_spans:
if float(span["start"]) <= refresh_t and (
"end" not in span or float(span["end"]) > refresh_t
):
current_subtask = span.get("text", "")
break
if interjection:
prompt += (
f"\n\n(Plan refresh at t={refresh_t:.2f}s after a user "
f"interjection: {interjection!r}. Current subtask just "
f"before the interjection: {current_subtask!r}. Update "
f"the plan so it reflects the interjection — drop or "
f"reorder steps as needed; do not just restate.)\n"
)
else:
# Refresh without an interjection text: still tell the model
# where in the episode the plan stands so the re-emission
# is grounded. Should be rare — plan refreshes are
# interjection-driven by design.
prompt += (
f"\n\n(Plan refresh at t={refresh_t:.2f}s. Current "
f"subtask: {current_subtask!r}.)\n"
)
messages = [{"role": "user", "content": [{"type": "text", "text": prompt}]}]
result = self.vlm.generate_json([messages])[0]
if isinstance(result, dict) and isinstance(result.get("plan"), str):
return result["plan"].strip()
return None
def _generate_memory(
self,

View File

@@ -1,18 +0,0 @@
You are the high-level planner for a robot demonstrating: "{episode_task}".
Given the subtask decomposition below, write a compact hierarchical PLAN.
Use short imperative fragments, like pi0.7 context prompts.
Subtasks for context:
{subtasks_text}
Authoring rules:
- 3 to {plan_max_steps} steps.
- Each step is one logical chunk, not one motion.
- Steps must be in execution order.
- Brief commands, not full sentences.
- Prefer: "open air fryer"; avoid: "The robot should open the air fryer."
- Plain text, no markdown headers.
Output strictly valid JSON:
{{ "plan": "1. ...\n2. ...\n3. ..." }}