#!/usr/bin/env python # Copyright 2026 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """PyAV-based compatibility checks for :class:`VideoEncoderConfig`. Centralises all :mod:`av` introspection of the bundled FFmpeg build. Checks degrade to a no-op when the target codec isn't available locally. """ from __future__ import annotations import functools import logging from typing import TYPE_CHECKING, Any, Literal import av import numpy as np import torch from lerobot.datasets.depth_utils import ( DEFAULT_DEPTH_MAX, DEFAULT_DEPTH_MIN, DEFAULT_DEPTH_SHIFT, DEFAULT_DEPTH_USE_LOG, quantize_depth, dequantize_depth, ) if TYPE_CHECKING: from lerobot.datasets.video_utils import VideoEncoderConfig logger = logging.getLogger(__name__) # Pixel formats supported by the depth encode/decode helpers below. Both are # 16-bit-word formats that carry 12 significant bits per sample, matching the # ``DEPTH_QMAX = 4095`` quantization range. DEPTH_PIX_FMTS: tuple[str, ...] = ("yuv420p12le", "gray12le") # Neutral chroma for 12-bit YUV (the midpoint of [0, 4095]). Filling the U/V # planes with this value keeps the encoder from spending bits on chroma noise # when only the Y plane carries information. _NEUTRAL_CHROMA_12BIT: int = 2048 FFMPEG_NUMERIC_OPTION_TYPES = ("INT", "INT64", "UINT64", "FLOAT", "DOUBLE") FFMPEG_INTEGER_OPTION_TYPES = ("INT", "INT64", "UINT64") def _write_u16_plane(plane: av.video.plane.VideoPlane, src: np.ndarray, fill_value: int | None = None) -> None: """Copy ``src`` into a uint16 plane respecting FFmpeg line padding.""" height, width = src.shape stride_u16 = plane.line_size // np.dtype(np.uint16).itemsize dst = np.frombuffer(plane, dtype=np.uint16).reshape(height, stride_u16) if fill_value is not None: dst.fill(fill_value) dst[:, :width] = src def encode_depth_frame_pyav( depth: np.ndarray | torch.Tensor, *, pix_fmt: str = "yuv420p12le", depth_min: float = DEFAULT_DEPTH_MIN, depth_max: float = DEFAULT_DEPTH_MAX, shift: float = DEFAULT_DEPTH_SHIFT, use_log: bool = DEFAULT_DEPTH_USE_LOG, input_unit: Literal["auto", "m", "mm"] = "auto", ) -> av.VideoFrame: """Quantize depth and pack it into a 12-bit PyAV video frame. Args: depth: Depth frame to encode (H, W). Unit handling follows :func:`lerobot.datasets.depth_utils.quantize_depth`. pix_fmt: Target pixel format. Must be one of :data:`DEPTH_PIX_FMTS`. depth_min, depth_max, shift, use_log, input_unit: Forwarded to :func:`quantize_depth`. Returns: An :class:`av.VideoFrame` in ``pix_fmt`` with quantized depth in the luminance plane. """ if pix_fmt not in DEPTH_PIX_FMTS: raise ValueError(f"Unsupported depth pix_fmt={pix_fmt!r}; expected one of {DEPTH_PIX_FMTS}") quantized_depth = quantize_depth( depth, depth_min=depth_min, depth_max=depth_max, shift=shift, use_log=use_log, input_unit=input_unit, ) if quantized_depth.ndim != 2: raise ValueError(f"depth must be a 2D frame; got shape {quantized_depth.shape}") quantized_depth = np.ascontiguousarray(quantized_depth, dtype=np.uint16) height, width = quantized_depth.shape if pix_fmt == "gray12le": frame = av.VideoFrame(width=width, height=height, format="gray12le") _write_u16_plane(frame.planes[0], quantized_depth) return frame if height % 2 != 0 or width % 2 != 0: raise ValueError("yuv420p12le requires even H and W") frame = av.VideoFrame(width=width, height=height, format="yuv420p12le") _write_u16_plane(frame.planes[0], quantized_depth) neutral_chroma = np.full((height // 2, width // 2), _NEUTRAL_CHROMA_12BIT, dtype=np.uint16) _write_u16_plane(frame.planes[1], neutral_chroma, fill_value=_NEUTRAL_CHROMA_12BIT) _write_u16_plane(frame.planes[2], neutral_chroma, fill_value=_NEUTRAL_CHROMA_12BIT) return frame def decode_depth_frame_pyav( frame: av.VideoFrame | list[av.VideoFrame], *, depth_min: float = DEFAULT_DEPTH_MIN, depth_max: float = DEFAULT_DEPTH_MAX, shift: float = DEFAULT_DEPTH_SHIFT, use_log: bool = DEFAULT_DEPTH_USE_LOG, return_quantized: bool = False, output_unit: Literal["m", "mm"] = "m", ) -> np.ndarray: """Decode one or many depth video frames to quantized or metric depth. Args: frame: A single depth frame or a list of depth frames. depth_min, depth_max, shift, use_log: Forwarded to :func:`dequantize_depth`. return_quantized: If ``True``, return raw 12-bit quanta as ``uint16``. output_unit: Unit for dequantized output (``"m"`` or ``"mm"``). Returns: ``(H, W)`` array for a single frame, or ``(N, H, W)`` for a list. """ frames = frame if isinstance(frame, list) else [frame] quantized = np.stack([f.reformat(format="gray12le").to_ndarray() for f in frames]).astype(np.uint16, copy=False) if return_quantized: return quantized[0] if len(frames) == 1 else quantized decoded = dequantize_depth( quantized, depth_min=depth_min, depth_max=depth_max, shift=shift, use_log=use_log, output_unit=output_unit, ) return decoded[0] if len(frames) == 1 else decoded @functools.cache def get_codec(vcodec: str) -> av.codec.Codec | None: """PyAV write-mode ``Codec`` for *vcodec*, or ``None`` if unavailable.""" try: return av.codec.Codec(vcodec, "w") except Exception: return None @functools.cache def _get_codec_video_formats(vcodec: str) -> dict[str, av.option.Option]: """Private-option name → PyAV ``Option`` for *vcodec* (empty if unavailable).""" codec = get_codec(vcodec) if codec is None: return {} return {opt.name: opt for opt in codec.descriptor.options} @functools.cache def _get_codec_video_formats(vcodec: str) -> tuple[str, ...]: """Pixel formats accepted by *vcodec* in PyAV's preferred order (empty if unknown).""" codec = get_codec(vcodec) if codec is None: return () return tuple(fmt.name for fmt in (codec.video_formats or [])) def detect_available_encoders_pyav(encoders: list[str] | str) -> list[str]: """Return the subset of *encoders* available as video encoders in the local FFmpeg build. Each name is probed directly via :func:`get_codec`; input order is preserved. """ if isinstance(encoders, str): encoders = [encoders] available: list[str] = [] for name in encoders: codec = get_codec(name) if codec is not None and codec.type == "video": available.append(name) else: logger.debug("encoder '%s' not available as video encoder", name) return available def _check_option_value(vcodec: str, label: str, value: Any, opt: av.option.Option) -> None: """Range-check numeric *value* and choice-check string *value* against *opt*.""" type_name = opt.type.name if type_name in FFMPEG_NUMERIC_OPTION_TYPES: if isinstance(value, bool): raise ValueError( f"{label}={value!r} is not numeric; codec {vcodec!r} expects a number for this option." ) elif isinstance(value, str): try: num_val = float(value) except ValueError as e: raise ValueError( f"{label}={value!r} is not numeric; codec {vcodec!r} expects a number for this option." ) from e elif isinstance(value, (float, int)): num_val = value else: raise ValueError( f"{label}={value!r} is not numeric; codec {vcodec!r} expects a number for this option." ) # Check integer type compatibility if type_name in FFMPEG_INTEGER_OPTION_TYPES and not num_val.is_integer(): raise ValueError( f"{label}={num_val!r} must be an integer for codec {vcodec!r} " f"(FFmpeg option {opt.name!r} is {type_name}); float values are not allowed." ) # Check numeric range compatibility lo, hi = float(opt.min), float(opt.max) if lo < hi and not (lo <= num_val <= hi): raise ValueError( f"{label}={num_val} is out of range for codec {vcodec!r}; must be in [{lo}, {hi}]" ) elif type_name == "STRING": if isinstance(value, bool): raise ValueError(f"{label}={value!r} is not a valid string value for codec {vcodec!r}.") if isinstance(value, str): str_val = value elif isinstance(value, (int, float)): str_val = str(value) else: raise ValueError(f"{label}={value!r} has unsupported type for STRING option on codec {vcodec!r}") # Check string choice compatibility choices = [c.name for c in (opt.choices or [])] if choices and str_val not in choices: raise ValueError( f"{label}={str_val!r} is not a supported choice for codec " f"{vcodec!r}; valid choices: {choices}" ) else: return def _check_pixel_format(vcodec: str, pix_fmt: str) -> None: formats = _get_codec_video_formats(vcodec) if formats and pix_fmt not in formats: raise ValueError( f"pix_fmt={pix_fmt!r} is not supported by codec {vcodec!r}; " f"supported pixel formats: {list(formats)}" ) def _check_codec_options(vcodec: str, codec_options: dict[str, Any], config: VideoEncoderConfig) -> None: """Validate merged encoder options (typed) against the codec's published AVOptions.""" supported_options = _get_codec_options_by_name(vcodec) for key, value in codec_options.items(): # GOP size is not a codec-specific option, it has to be validated separately. if key == "g": if isinstance(value, bool) or not isinstance(value, int) or value < 1: raise ValueError(f"g={value!r} must be a positive integer for codec {vcodec!r}") continue if key not in supported_options: continue opt = supported_options[key] label = f"extra_options[{key!r}]" if key in config.extra_options else key _check_option_value(vcodec, label, value, opt) def check_video_encoder_config_pyav(config: VideoEncoderConfig) -> None: """Verify *config* is compatible with the bundled FFmpeg build. Checks pixel format, abstract tuning-field compatibility, and each merged encoder option from :meth:`~lerobot.datasets.video_utils.VideoEncoderConfig.get_codec_options` against PyAV (including numeric ``extra_options`` present in that dict). No-op when ``config.vcodec`` isn't in the local FFmpeg build. Raises: ValueError: on the first incompatibility encountered. """ vcodec = config.vcodec options = _get_codec_options_by_name(vcodec) if not options: logger.warning( "Codec %r is not available in the bundled FFmpeg build; ", vcodec, ) return _check_pixel_format(config.vcodec, config.pix_fmt) _check_codec_options(config.vcodec, config.get_codec_options(), config)