mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-02 20:01:25 +00:00
review: fix dead-code bug, add thread safety, atomic writes, smaller cleanups
**Critical: video_for_episode was unreachable dead code.**
``video_for_episode`` was indented inside ``_decode_pyav_direct``, after
its ``return`` statement — Python parsed it as a nested function that
never executed. Module 1's ``_episode_video_block`` calls
``self.frame_provider.video_for_episode(record, target_count)`` on the
``use_video_url=False`` path, which would have AttributeError'd on any
real dataset. Tests passed only because they used ``_StubFrameProvider``
/ ``_NullProvider`` which have the method. Moved it to be a proper
method of ``VideoFrameProvider`` (right after ``frames_at``).
**Thread safety on VideoFrameProvider.**
The executor runs Module 1/2/3 phases under a ``ThreadPoolExecutor``, so
the per-instance ``_cache`` dict and the one-shot ``_warned_decode_fail``
flag were exposed to concurrent reads/writes. Added a ``threading.Lock``
field, wrapped cache reads/writes and the warn-flag check-and-set in
``with self._lock:``. Stub fixtures unaffected.
**episode_clip_path is now a method of VideoFrameProvider.**
Used to be a free function reaching into ``provider._meta.episodes`` and
``provider._meta.get_video_file_path`` from outside the class. As a
method it just uses ``self._meta``. The only caller (Module 1) updated;
no external callers.
**Atomic write in LanguageColumnsWriter.**
``pq.write_table(new_table, path)`` was overwriting the parquet shard
in place — a crash mid-write would corrupt the file. Now writes to a
sibling ``.tmp`` and ``Path.replace`` atomically.
**Smaller items:**
* ``executor.py`` docstring opened with "four phases" but listed six.
Now says "six phases" to match.
* ``[annotations]`` extra in ``pyproject.toml`` now includes
``openai>=1.40,<2.0``. Default ``VlmConfig.backend`` is ``"openai"``,
so without it ``_make_openai_client`` would ImportError on a fresh
``uv sync --extra annotations``.
* ``_snap_to_frame`` was duplicated identically in
``plan_subtasks_memory.py`` and ``interjections_and_speech.py``.
Promoted to ``snap_to_frame`` in ``reader.py`` (next to
``EpisodeRecord``); both modules now import it. Backwards-compat alias
not needed — no external callers.
* ``EpisodeRecord.frames_df()`` was re-reading the full parquet on every
call. Now memoizes via a private dataclass field so repeat calls from
different modules pay the cost once. Method signature unchanged.
* ``_extract_first_json_object`` had a redundant ``and not escape`` guard
that was dead because the prior block already handled and reset
``escape``. Replaced with a comment explaining the invariant.
**Pre-existing lint cleanups surfaced once these files entered
pre-commit's scope:**
* dead local ``client = clients[0]`` in ``_make_openai_client`` (the
real round-robin uses ``clients[rr_counter[...]]``).
* ``cmd = ... if "{port}" in cmd else f"...{port}"`` ternary collapse in
``_spawn_parallel_inference_servers``.
* ``seek_pts = 0 if stream.time_base is None else int(...)`` ternary
collapse in ``_decode_pyav_direct``.
* ``# nosec B310`` on the localhost ``urllib.request.urlopen`` probe in
``_server_is_up`` — the URL is the user-configured local-server endpoint
the CLI itself spawned, not arbitrary user input.
**Test added.**
``tests/annotations/test_frames.py`` pins the regression on
``VideoFrameProvider``: asserts ``video_for_episode`` and
``episode_clip_path`` are callable methods (not nested dead code or
free functions), and that the ``_lock`` field is a real
``threading.Lock``.
Sweep: 64 passed, 2 failed (same pre-existing module-impl bugs as
before this commit). Pre-commit clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,6 +24,7 @@ querying the same timestamp pay decode cost once.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol
|
||||
@@ -121,6 +122,10 @@ class VideoFrameProvider:
|
||||
_meta: Any = field(default=None, init=False, repr=False)
|
||||
_cache: dict = field(default_factory=dict, init=False, repr=False)
|
||||
_camera_keys: list[str] = field(default_factory=list, init=False, repr=False)
|
||||
# Pipeline runs Module 1/2/3 phases under a ThreadPoolExecutor (see
|
||||
# ``ExecutorConfig.episode_parallelism``); guard the dict cache and the
|
||||
# one-shot warn flag against concurrent updates from worker threads.
|
||||
_lock: threading.Lock = field(default_factory=threading.Lock, init=False, repr=False)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata # noqa: PLC0415
|
||||
@@ -158,33 +163,110 @@ class VideoFrameProvider:
|
||||
out: list[Any] = []
|
||||
misses: list[float] = []
|
||||
miss_indices: list[int] = []
|
||||
for i, ts in enumerate(timestamps):
|
||||
key = (record.episode_index, target, round(float(ts), 6))
|
||||
cached = self._cache.get(key)
|
||||
if cached is not None:
|
||||
out.append(cached)
|
||||
else:
|
||||
out.append(None)
|
||||
misses.append(float(ts))
|
||||
miss_indices.append(i)
|
||||
with self._lock:
|
||||
for i, ts in enumerate(timestamps):
|
||||
key = (record.episode_index, target, round(float(ts), 6))
|
||||
cached = self._cache.get(key)
|
||||
if cached is not None:
|
||||
out.append(cached)
|
||||
else:
|
||||
out.append(None)
|
||||
misses.append(float(ts))
|
||||
miss_indices.append(i)
|
||||
|
||||
if misses:
|
||||
decoded = self._decode(record.episode_index, misses, target)
|
||||
# decoder may return fewer frames than requested when some
|
||||
# timestamps fall outside the video; pair what we have and
|
||||
# leave the rest as None to be filtered below.
|
||||
for i, img in zip(miss_indices, decoded):
|
||||
out[i] = img
|
||||
key = (record.episode_index, target, round(float(timestamps[i]), 6))
|
||||
if len(self._cache) >= self.cache_size:
|
||||
self._cache.pop(next(iter(self._cache)))
|
||||
self._cache[key] = img
|
||||
with self._lock:
|
||||
for i, img in zip(miss_indices, decoded, strict=False):
|
||||
out[i] = img
|
||||
key = (record.episode_index, target, round(float(timestamps[i]), 6))
|
||||
if len(self._cache) >= self.cache_size:
|
||||
self._cache.pop(next(iter(self._cache)))
|
||||
self._cache[key] = img
|
||||
# filter out any None left over from decode failures
|
||||
return [img for img in out if img is not None]
|
||||
|
||||
def _decode(
|
||||
self, episode_index: int, timestamps: list[float], camera_key: str
|
||||
def video_for_episode(
|
||||
self,
|
||||
record: EpisodeRecord,
|
||||
max_frames: int,
|
||||
camera_key: str | None = None,
|
||||
) -> list[Any]:
|
||||
"""Return up to ``max_frames`` images uniformly sampled across the episode.
|
||||
|
||||
The whole episode duration is covered; the model picks subtask
|
||||
boundaries from the temporal pooling it does internally.
|
||||
"""
|
||||
target = camera_key if camera_key is not None else self.camera_key
|
||||
if max_frames <= 0 or target is None or not record.frame_timestamps:
|
||||
return []
|
||||
n_frames = min(max_frames, len(record.frame_timestamps))
|
||||
if n_frames == len(record.frame_timestamps):
|
||||
timestamps = list(record.frame_timestamps)
|
||||
else:
|
||||
t0 = record.frame_timestamps[0]
|
||||
t_last = record.frame_timestamps[-1]
|
||||
if t_last <= t0:
|
||||
timestamps = [float(t0)] * n_frames
|
||||
else:
|
||||
step = (t_last - t0) / (n_frames - 1) if n_frames > 1 else 0.0
|
||||
timestamps = [float(t0 + i * step) for i in range(n_frames)]
|
||||
return self.frames_at(record, timestamps, camera_key=target)
|
||||
|
||||
def episode_clip_path(self, record: EpisodeRecord, cache_dir: Path) -> Path | None:
|
||||
"""Extract the episode's subclip to ``cache_dir/ep_{idx:06d}.mp4``.
|
||||
|
||||
Returns ``None`` if the dataset has no video tracks. Skips
|
||||
re-extract when the cached clip already exists. Re-encodes to
|
||||
H.264 (libx264) so the resulting mp4 is decodable by every
|
||||
downstream video processor — stream-copy would inherit the
|
||||
source codec (often AV1 in modern LeRobot datasets), which
|
||||
vllm's libav build cannot decode.
|
||||
"""
|
||||
import subprocess # noqa: PLC0415
|
||||
|
||||
if self.camera_key is None:
|
||||
return None
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
out_path = cache_dir / f"ep_{record.episode_index:06d}.mp4"
|
||||
if out_path.exists() and out_path.stat().st_size > 0:
|
||||
return out_path
|
||||
ep = self._meta.episodes[record.episode_index]
|
||||
from_timestamp = float(ep[f"videos/{self.camera_key}/from_timestamp"])
|
||||
to_timestamp = float(ep[f"videos/{self.camera_key}/to_timestamp"])
|
||||
src = self.root / self._meta.get_video_file_path(record.episode_index, self.camera_key)
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-loglevel",
|
||||
"error",
|
||||
"-ss",
|
||||
f"{from_timestamp:.3f}",
|
||||
"-to",
|
||||
f"{to_timestamp:.3f}",
|
||||
"-i",
|
||||
str(src),
|
||||
"-c:v",
|
||||
"libx264",
|
||||
"-preset",
|
||||
"ultrafast",
|
||||
"-crf",
|
||||
"23",
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-an",
|
||||
str(out_path),
|
||||
]
|
||||
try:
|
||||
subprocess.run(cmd, check=True, timeout=300)
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
|
||||
return None
|
||||
return out_path if out_path.exists() and out_path.stat().st_size > 0 else None
|
||||
|
||||
def _decode(self, episode_index: int, timestamps: list[float], camera_key: str) -> list[Any]:
|
||||
ep = self._meta.episodes[episode_index]
|
||||
from_timestamp = ep[f"videos/{camera_key}/from_timestamp"]
|
||||
shifted = [from_timestamp + ts for ts in timestamps]
|
||||
@@ -197,25 +279,25 @@ class VideoFrameProvider:
|
||||
# Module-3-no-op (every prompt skipped because frames_at returned
|
||||
# []) is debuggable from the job log instead of post-hoc parquet
|
||||
# inspection. Subsequent failures stay quiet.
|
||||
if not getattr(self, "_warned_decode_fail", False):
|
||||
with self._lock:
|
||||
already_warned = getattr(self, "_warned_decode_fail", False)
|
||||
if not already_warned:
|
||||
self._warned_decode_fail = True
|
||||
if not already_warned:
|
||||
import logging # noqa: PLC0415
|
||||
|
||||
logging.getLogger(__name__).warning(
|
||||
"VideoFrameProvider._decode failed for episode=%s camera=%s "
|
||||
"video_path=%s: %s",
|
||||
"VideoFrameProvider._decode failed for episode=%s camera=%s video_path=%s: %s",
|
||||
episode_index,
|
||||
camera_key,
|
||||
video_path,
|
||||
exc,
|
||||
exc_info=True,
|
||||
)
|
||||
self._warned_decode_fail = True
|
||||
return []
|
||||
|
||||
|
||||
def _decode_pyav_direct(
|
||||
video_path: Any, timestamps: list[float], tolerance_s: float
|
||||
) -> list[Any]:
|
||||
def _decode_pyav_direct(video_path: Any, timestamps: list[float], tolerance_s: float) -> list[Any]:
|
||||
"""Decode the requested timestamps from ``video_path`` using PyAV directly.
|
||||
|
||||
Bypasses ``lerobot.datasets.video_utils.decode_video_frames`` entirely
|
||||
@@ -231,7 +313,6 @@ def _decode_pyav_direct(
|
||||
the previous behaviour); callers filter ``None``/missing entries.
|
||||
"""
|
||||
import av # noqa: PLC0415
|
||||
from PIL import Image # noqa: PLC0415
|
||||
|
||||
if not timestamps:
|
||||
return []
|
||||
@@ -243,10 +324,7 @@ def _decode_pyav_direct(
|
||||
try:
|
||||
stream = container.streams.video[0]
|
||||
# PyAV needs the seek target in stream timebase ticks.
|
||||
if stream.time_base is None:
|
||||
seek_pts = 0
|
||||
else:
|
||||
seek_pts = int(seek_to / float(stream.time_base))
|
||||
seek_pts = 0 if stream.time_base is None else int(seek_to / float(stream.time_base))
|
||||
try:
|
||||
container.seek(seek_pts, any_frame=False, backward=True, stream=stream)
|
||||
except av.AVError:
|
||||
@@ -276,33 +354,6 @@ def _decode_pyav_direct(
|
||||
|
||||
return [results[ts] for ts in timestamps if ts in results]
|
||||
|
||||
def video_for_episode(
|
||||
self,
|
||||
record: EpisodeRecord,
|
||||
max_frames: int,
|
||||
camera_key: str | None = None,
|
||||
) -> list[Any]:
|
||||
"""Return up to ``max_frames`` images uniformly sampled across the episode.
|
||||
|
||||
The whole episode duration is covered; the model picks subtask
|
||||
boundaries from the temporal pooling it does internally.
|
||||
"""
|
||||
target = camera_key if camera_key is not None else self.camera_key
|
||||
if max_frames <= 0 or target is None or not record.frame_timestamps:
|
||||
return []
|
||||
n_frames = min(max_frames, len(record.frame_timestamps))
|
||||
if n_frames == len(record.frame_timestamps):
|
||||
timestamps = list(record.frame_timestamps)
|
||||
else:
|
||||
t0 = record.frame_timestamps[0]
|
||||
t_last = record.frame_timestamps[-1]
|
||||
if t_last <= t0:
|
||||
timestamps = [float(t0)] * n_frames
|
||||
else:
|
||||
step = (t_last - t0) / (n_frames - 1) if n_frames > 1 else 0.0
|
||||
timestamps = [float(t0 + i * step) for i in range(n_frames)]
|
||||
return self.frames_at(record, timestamps, camera_key=target)
|
||||
|
||||
|
||||
def make_frame_provider(root: Path, camera_key: str | None = None) -> FrameProvider:
|
||||
"""Build a :class:`VideoFrameProvider` if videos are present, else null."""
|
||||
@@ -341,60 +392,3 @@ def to_video_url_block(url: str | None, fps: float = 2.0) -> list[dict[str, Any]
|
||||
if not url:
|
||||
return []
|
||||
return [{"type": "video_url", "video_url": {"url": url}, "fps": fps}]
|
||||
|
||||
|
||||
def episode_clip_path(
|
||||
record: EpisodeRecord,
|
||||
provider: "VideoFrameProvider",
|
||||
cache_dir: Path,
|
||||
) -> Path | None:
|
||||
"""Extract the episode's subclip to ``cache_dir/ep_{idx:06d}.mp4``.
|
||||
|
||||
Returns ``None`` if the dataset has no video tracks. Skips re-extract
|
||||
when the cached clip already exists. Re-encodes to H.264
|
||||
(libx264) so the resulting mp4 is decodable by every downstream
|
||||
video processor — stream-copy would inherit the source codec
|
||||
(often AV1 in modern LeRobot datasets), which vllm's libav build
|
||||
cannot decode.
|
||||
"""
|
||||
import subprocess # noqa: PLC0415
|
||||
|
||||
if provider.camera_key is None:
|
||||
return None
|
||||
cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
out_path = cache_dir / f"ep_{record.episode_index:06d}.mp4"
|
||||
if out_path.exists() and out_path.stat().st_size > 0:
|
||||
return out_path
|
||||
ep = provider._meta.episodes[record.episode_index]
|
||||
from_timestamp = float(ep[f"videos/{provider.camera_key}/from_timestamp"])
|
||||
to_timestamp = float(ep[f"videos/{provider.camera_key}/to_timestamp"])
|
||||
src = provider.root / provider._meta.get_video_file_path(
|
||||
record.episode_index, provider.camera_key
|
||||
)
|
||||
cmd = [
|
||||
"ffmpeg",
|
||||
"-y",
|
||||
"-loglevel",
|
||||
"error",
|
||||
"-ss",
|
||||
f"{from_timestamp:.3f}",
|
||||
"-to",
|
||||
f"{to_timestamp:.3f}",
|
||||
"-i",
|
||||
str(src),
|
||||
"-c:v",
|
||||
"libx264",
|
||||
"-preset",
|
||||
"ultrafast",
|
||||
"-crf",
|
||||
"23",
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
"-an",
|
||||
str(out_path),
|
||||
]
|
||||
try:
|
||||
subprocess.run(cmd, check=True, timeout=300)
|
||||
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
|
||||
return None
|
||||
return out_path if out_path.exists() and out_path.stat().st_size > 0 else None
|
||||
|
||||
Reference in New Issue
Block a user