From a515eadc967f2f0ae093ee1f129e8a5d41ead93f Mon Sep 17 00:00:00 2001 From: Pepijn Date: Mon, 20 Apr 2026 21:31:17 +0200 Subject: [PATCH] refactor(profiling): consolidate into single module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unify the profiling subsystem into one file per reviewer request. Before (4 files): src/lerobot/utils/profiling_utils.py 399 LOC scripts/ci/run_model_profiling.py 337 LOC profiling/model_profiling_specs.json 181 LOC tests/scripts/test_model_profiling.py 423 LOC After (2 files): src/lerobot/utils/model_profiling.py 758 LOC — TrainingProfiler + CI orchestrator + POLICY_SPECS (inline) tests/test_model_profiling.py 315 LOC Net: -267 LOC and 4 files → 2. All functionality preserved: per-step forward/backward/optimizer timings, torch profiler tables + chrome traces, deterministic-forward fingerprint, HF Hub result upload, and the same CLI surface. Changes: - Collapse `_StepTimingCollector` into inline attributes on `TrainingProfiler` (no separate class). - Drop `ProfilingSpec` dataclass; specs are plain dicts. - Inline the JSON matrix as a module-level `POLICY_SPECS` dict — one less file to keep in sync with the training args. - CI workflow invokes `python -m lerobot.utils.model_profiling` in place of the standalone script. - Tests import `lerobot.utils.model_profiling` directly instead of loading a script-by-path. Removed JSON schema tests that no longer apply. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/model_profiling.yml | 8 +- profiling/model_profiling_specs.json | 181 ------ scripts/ci/run_model_profiling.py | 337 ------------ src/lerobot/scripts/lerobot_train.py | 2 +- src/lerobot/utils/model_profiling.py | 763 ++++++++++++++++++++++++++ src/lerobot/utils/profiling_utils.py | 399 -------------- tests/scripts/test_model_profiling.py | 423 -------------- tests/test_model_profiling.py | 312 +++++++++++ 8 files changed, 1079 insertions(+), 1346 deletions(-) delete mode 100644 profiling/model_profiling_specs.json delete mode 100644 scripts/ci/run_model_profiling.py create mode 100644 src/lerobot/utils/model_profiling.py delete mode 100644 src/lerobot/utils/profiling_utils.py delete mode 100644 tests/scripts/test_model_profiling.py create mode 100644 tests/test_model_profiling.py diff --git a/.github/workflows/model_profiling.yml b/.github/workflows/model_profiling.yml index 80a6746d0..f148e02d7 100644 --- a/.github/workflows/model_profiling.yml +++ b/.github/workflows/model_profiling.yml @@ -23,12 +23,10 @@ on: - feat/libero-benchmark paths: - .github/workflows/model_profiling.yml - - profiling/model_profiling_specs.json - - scripts/ci/run_model_profiling.py - src/lerobot/configs/train.py - src/lerobot/scripts/lerobot_train.py - - src/lerobot/utils/profiling_utils.py - - tests/scripts/test_model_profiling.py + - src/lerobot/utils/model_profiling.py + - tests/test_model_profiling.py workflow_dispatch: inputs: git_ref: @@ -206,7 +204,7 @@ jobs: fi cmd=( - uv run python scripts/ci/run_model_profiling.py + uv run python -m lerobot.utils.model_profiling --output_dir=/workspace/profiling-results --hub_org=lerobot --results_repo="${RESULTS_REPO}" diff --git a/profiling/model_profiling_specs.json b/profiling/model_profiling_specs.json deleted file mode 100644 index 9353995df..000000000 --- a/profiling/model_profiling_specs.json +++ /dev/null @@ -1,181 +0,0 @@ -{ - "act": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/pusht", - "--dataset.episodes=[0]", - "--policy.type=act", - "--policy.device=cuda", - "--batch_size=4", - "--cudnn_deterministic=true" - ] - }, - "diffusion": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/pusht", - "--dataset.episodes=[0]", - "--policy.type=diffusion", - "--policy.device=cuda", - "--batch_size=4", - "--cudnn_deterministic=true" - ] - }, - "groot": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.type=groot", - "--policy.base_model_path=nvidia/GR00T-N1.5-3B", - "--policy.tune_diffusion_model=true", - "--policy.tune_projector=true", - "--policy.tune_llm=false", - "--policy.tune_visual=false", - "--policy.use_bf16=true", - "--policy.device=cuda", - "--batch_size=1", - "--rename_map={\"observation.images.image\": \"observation.images.camera1\", \"observation.images.image2\": \"observation.images.camera2\"}" - ] - }, - "multi_task_dit": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/pusht", - "--dataset.episodes=[0]", - "--policy.type=multi_task_dit", - "--policy.device=cuda", - "--policy.horizon=32", - "--policy.n_action_steps=30", - "--batch_size=4", - "--cudnn_deterministic=true" - ] - }, - "pi0": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.path=lerobot/pi0_base", - "--policy.device=cuda", - "--policy.dtype=bfloat16", - "--policy.n_action_steps=30", - "--policy.use_amp=true", - "--policy.gradient_checkpointing=true", - "--batch_size=1", - "--use_policy_training_preset=false", - "--optimizer.type=sgd", - "--optimizer.lr=1e-5", - "--optimizer.weight_decay=0", - "--optimizer.grad_clip_norm=1.0", - "--scheduler.type=cosine_decay_with_warmup", - "--scheduler.peak_lr=1e-5", - "--scheduler.decay_lr=1e-6", - "--scheduler.num_warmup_steps=0", - "--scheduler.num_decay_steps=12", - "--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}" - ] - }, - "pi0_fast": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.path=lerobot/pi0fast-base", - "--policy.device=cuda", - "--policy.dtype=bfloat16", - "--policy.n_action_steps=30", - "--policy.use_amp=true", - "--policy.gradient_checkpointing=true", - "--batch_size=1", - "--use_policy_training_preset=false", - "--optimizer.type=sgd", - "--optimizer.lr=1e-5", - "--optimizer.weight_decay=0", - "--optimizer.grad_clip_norm=1.0", - "--scheduler.type=cosine_decay_with_warmup", - "--scheduler.peak_lr=1e-5", - "--scheduler.decay_lr=1e-6", - "--scheduler.num_warmup_steps=0", - "--scheduler.num_decay_steps=12", - "--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}" - ] - }, - "pi05": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.path=lerobot/pi05_base", - "--policy.device=cuda", - "--policy.dtype=bfloat16", - "--policy.n_action_steps=30", - "--policy.use_amp=true", - "--policy.gradient_checkpointing=true", - "--batch_size=1", - "--use_policy_training_preset=false", - "--optimizer.type=sgd", - "--optimizer.lr=1e-5", - "--optimizer.weight_decay=0", - "--optimizer.grad_clip_norm=1.0", - "--scheduler.type=cosine_decay_with_warmup", - "--scheduler.peak_lr=1e-5", - "--scheduler.decay_lr=1e-6", - "--scheduler.num_warmup_steps=0", - "--scheduler.num_decay_steps=12", - "--policy.normalization_mapping={\"ACTION\": \"MEAN_STD\", \"STATE\": \"MEAN_STD\", \"VISUAL\": \"IDENTITY\"}", - "--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}" - ] - }, - "smolvla": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.path=lerobot/smolvla_base", - "--policy.load_vlm_weights=true", - "--policy.freeze_vision_encoder=false", - "--policy.train_expert_only=false", - "--policy.empty_cameras=1", - "--policy.device=cuda", - "--batch_size=1", - "--rename_map={\"observation.images.front\": \"observation.images.camera1\", \"observation.images.wrist\": \"observation.images.camera2\"}" - ] - }, - "wall_x": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/aloha_sim_insertion_human", - "--dataset.episodes=[0]", - "--policy.type=wall_x", - "--policy.pretrained_name_or_path=x-square-robot/wall-oss-flow", - "--policy.prediction_mode=diffusion", - "--policy.attn_implementation=eager", - "--policy.device=cuda", - "--batch_size=1", - "--use_policy_training_preset=false", - "--optimizer.type=sgd", - "--optimizer.lr=1e-5", - "--optimizer.weight_decay=0", - "--optimizer.grad_clip_norm=1.0", - "--scheduler.type=cosine_decay_with_warmup", - "--scheduler.peak_lr=1e-5", - "--scheduler.decay_lr=1e-6", - "--scheduler.num_warmup_steps=0", - "--scheduler.num_decay_steps=12" - ] - }, - "xvla": { - "steps": 12, - "train_args": [ - "--dataset.repo_id=lerobot/libero_plus", - "--dataset.episodes=[0]", - "--policy.path=lerobot/xvla-widowx", - "--policy.action_mode=auto", - "--policy.empty_cameras=1", - "--policy.device=cuda", - "--batch_size=1", - "--rename_map={\"observation.images.front\": \"observation.images.image\", \"observation.images.wrist\": \"observation.images.image2\"}" - ] - } -} diff --git a/scripts/ci/run_model_profiling.py b/scripts/ci/run_model_profiling.py deleted file mode 100644 index 0ae32db2a..000000000 --- a/scripts/ci/run_model_profiling.py +++ /dev/null @@ -1,337 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import argparse -import json -import re -import shutil -import subprocess -import time -from dataclasses import dataclass -from datetime import UTC, datetime -from pathlib import Path -from typing import Any - -from huggingface_hub import CommitOperationAdd, HfApi -from huggingface_hub.errors import HfHubHTTPError - - -@dataclass(frozen=True) -class ProfilingSpec: - name: str - steps: int - train_args: list[str] - - -@dataclass(frozen=True) -class UploadTarget: - local_path: Path - path_in_repo: str - - -@dataclass(frozen=True) -class UploadResult: - uploaded_paths: dict[str, str] - pr_url: str | None = None - - -def utc_timestamp_slug(now: datetime | None = None) -> str: - current = now or datetime.now(UTC) - return current.strftime("%Y%m%dT%H%M%SZ") - - -def make_hub_file_url( - repo_id: str, - path_in_repo: str, - repo_type: str = "dataset", - revision: str = "main", -) -> str: - prefix = "datasets/" if repo_type == "dataset" else "" - return f"https://huggingface.co/{prefix}{repo_id}/resolve/{revision}/{path_in_repo}" - - -def parse_discussion_num(pr_url: str | None) -> int | None: - if not pr_url: - return None - match = re.search(r"/discussions/(\d+)$", pr_url) - return int(match.group(1)) if match else None - - -def upload_targets( - repo_id: str, - targets: list[UploadTarget], - *, - repo_type: str = "dataset", - token: str | None = None, - commit_message: str | None = None, - create_pr: bool = False, -) -> UploadResult: - api = HfApi(token=token) - operations = [ - CommitOperationAdd(path_in_repo=target.path_in_repo, path_or_fileobj=str(target.local_path)) - for target in targets - ] - commit = api.create_commit( - repo_id=repo_id, - repo_type=repo_type, - operations=operations, - commit_message=commit_message or f"Upload {len(targets)} profiling artifacts", - revision="main", - create_pr=create_pr, - ) - revision = "main" - pr_num = parse_discussion_num(commit.pr_url) - if create_pr and pr_num is not None: - revision = f"refs/pr/{pr_num}" - uploaded = { - target.path_in_repo: make_hub_file_url( - repo_id, target.path_in_repo, repo_type=repo_type, revision=revision - ) - for target in targets - } - return UploadResult(uploaded_paths=uploaded, pr_url=commit.pr_url) - - -def normalize_repo_id(repo: str, hub_org: str) -> str: - return repo if "/" in repo else f"{hub_org}/{repo}" - - -def load_specs(path: Path) -> dict[str, ProfilingSpec]: - payload = json.loads(path.read_text()) - return { - name: ProfilingSpec(name=name, steps=spec["steps"], train_args=spec["train_args"]) - for name, spec in payload.items() - } - - -def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument("--spec-file", type=Path, default=Path("profiling/model_profiling_specs.json")) - parser.add_argument("--policies", nargs="*", default=None) - parser.add_argument("--output_dir", type=Path, required=True) - parser.add_argument("--hub_org", default="lerobot") - parser.add_argument("--results_repo", default="model-profiling-history") - parser.add_argument("--publish", action="store_true") - parser.add_argument("--profile_mode", choices=["summary", "trace"], default="trace") - parser.add_argument("--git_commit", default="") - parser.add_argument("--git_ref", default="") - parser.add_argument("--pr_number", default="") - return parser.parse_args() - - -def get_selected_names(requested: list[str] | None, specs: dict[str, ProfilingSpec]) -> list[str]: - if not requested: - return list(specs) - unknown = sorted(set(requested) - set(specs)) - if unknown: - raise ValueError(f"Unknown profiling policies: {', '.join(unknown)}") - return requested - - -def build_train_command(spec: ProfilingSpec, run_dir: Path, profile_mode: str) -> list[str]: - train_output_dir = run_dir / "train" - profile_output_dir = run_dir / "profiling" - return [ - "uv", - "run", - "lerobot-train", - *spec.train_args, - f"--output_dir={train_output_dir}", - f"--steps={spec.steps}", - "--eval_freq=0", - "--save_checkpoint=false", - f"--save_freq={spec.steps}", - "--wandb.enable=false", - "--policy.push_to_hub=false", - "--num_workers=0", - "--log_freq=1", - f"--profile_mode={profile_mode}", - f"--profile_output_dir={profile_output_dir}", - ] - - -def load_json_if_exists(path: Path) -> dict[str, Any] | None: - if not path.exists(): - return None - return json.loads(path.read_text()) - - -def build_artifact_index( - *, - repo_id: str, - run_dir: Path, - policy_name: str, - run_id: str, -) -> tuple[dict[str, Any], dict[str, Any], list[UploadTarget], str]: - row_path_in_repo = f"rows/{policy_name}/{run_id}.json" - artifact_root = f"artifacts/{policy_name}/{run_id}" - artifact_paths: dict[str, Any] = { - "row": row_path_in_repo, - "profiling_files": {}, - "torch_tables": {}, - "trace_files": {}, - } - artifact_urls: dict[str, Any] = { - "row": make_hub_file_url(repo_id, row_path_in_repo), - "profiling_files": {}, - "torch_tables": {}, - "trace_files": {}, - } - targets: list[UploadTarget] = [] - - for name in ("stdout.txt", "stderr.txt"): - path = run_dir / name - if not path.exists(): - continue - repo_path = f"{artifact_root}/{name}" - artifact_paths[name.removesuffix(".txt")] = repo_path - artifact_urls[name.removesuffix(".txt")] = make_hub_file_url(repo_id, repo_path) - targets.append(UploadTarget(local_path=path, path_in_repo=repo_path)) - - profiling_dir = run_dir / "profiling" - for path in sorted(profiling_dir.rglob("*")) if profiling_dir.exists() else []: - if not path.is_file(): - continue - relative_path = str(path.relative_to(run_dir)) - repo_path = f"{artifact_root}/{relative_path}" - artifact_paths["profiling_files"][relative_path] = repo_path - artifact_urls["profiling_files"][relative_path] = make_hub_file_url(repo_id, repo_path) - targets.append(UploadTarget(local_path=path, path_in_repo=repo_path)) - - if path.name == "step_timing_summary.json": - artifact_paths["step_timing_summary"] = repo_path - artifact_urls["step_timing_summary"] = make_hub_file_url(repo_id, repo_path) - elif "torch_tables" in path.parts: - artifact_paths["torch_tables"][path.name] = repo_path - artifact_urls["torch_tables"][path.name] = make_hub_file_url(repo_id, repo_path) - elif "torch_traces" in path.parts: - artifact_paths["trace_files"][path.name] = repo_path - artifact_urls["trace_files"][path.name] = make_hub_file_url(repo_id, repo_path) - - return artifact_paths, artifact_urls, targets, row_path_in_repo - - -def upload_profile_run( - *, - repo_id: str, - row_path: Path, - row_path_in_repo: str, - artifact_targets: list[UploadTarget], - create_pr: bool = False, -) -> UploadResult: - return upload_targets( - repo_id=repo_id, - targets=[*artifact_targets, UploadTarget(local_path=row_path, path_in_repo=row_path_in_repo)], - repo_type="dataset", - commit_message=f"Add model profiling row {row_path_in_repo}", - create_pr=create_pr, - ) - - -def main() -> int: - args = parse_args() - specs = load_specs(args.spec_file) - selected = get_selected_names(args.policies, specs) - args.output_dir.mkdir(parents=True, exist_ok=True) - repo_id = normalize_repo_id(args.results_repo, args.hub_org) - git_executable = shutil.which("git") - if not git_executable: - raise RuntimeError("git executable not found in PATH") - git_commit = ( - args.git_commit or subprocess.check_output([git_executable, "rev-parse", "HEAD"], text=True).strip() - ) - pr_number = int(args.pr_number) if str(args.pr_number).strip() else None - - for policy_name in selected: - spec = specs[policy_name] - run_id = f"{utc_timestamp_slug()}__{policy_name}" - run_dir = args.output_dir / policy_name / run_id - run_dir.mkdir(parents=True, exist_ok=True) - cmd = build_train_command(spec, run_dir, args.profile_mode) - - start = time.perf_counter() - result = subprocess.run(cmd, capture_output=True, text=True) - duration_s = time.perf_counter() - start - - stdout_path = run_dir / "stdout.txt" - stderr_path = run_dir / "stderr.txt" - stdout_path.write_text(result.stdout) - stderr_path.write_text(result.stderr) - - profile_summary = load_json_if_exists(run_dir / "profiling" / "step_timing_summary.json") or {} - deterministic_forward = ( - load_json_if_exists(run_dir / "profiling" / "deterministic_forward.json") or {} - ) - artifact_paths, artifact_urls, artifact_targets, row_path_in_repo = build_artifact_index( - repo_id=repo_id, - run_dir=run_dir, - policy_name=policy_name, - run_id=run_id, - ) - row = { - "schema_version": 1, - "created_at": datetime.now(UTC).isoformat(), - "run_id": run_id, - "policy": policy_name, - "git_commit": git_commit, - "git_ref": args.git_ref or None, - "pr_number": pr_number, - "status": "success" if result.returncode == 0 else "failed", - "return_code": result.returncode, - "profile_mode": args.profile_mode, - "wall_time_s": duration_s, - "spec": { - "steps": spec.steps, - "train_args": spec.train_args, - }, - "step_timing_summary": profile_summary, - "deterministic_forward": deterministic_forward, - "artifact_paths": artifact_paths, - "artifact_urls": artifact_urls, - "stderr_tail": result.stderr.splitlines()[-20:], - } - - row_path = run_dir / "profiling_row.json" - row_path.write_text(json.dumps(row, indent=2, sort_keys=True)) - - if args.publish: - try: - upload_result = upload_profile_run( - repo_id=repo_id, - row_path=row_path, - row_path_in_repo=row_path_in_repo, - artifact_targets=artifact_targets, - create_pr=pr_number is not None, - ) - except HfHubHTTPError as exc: - row["publish_status"] = "failed" - row["publish_error"] = str(exc) - else: - row["publish_status"] = "success" - row["uploaded_paths"] = upload_result.uploaded_paths - row["publish_pr_url"] = upload_result.pr_url - row["publish_pr_number"] = parse_discussion_num(upload_result.pr_url) - row_path.write_text(json.dumps(row, indent=2, sort_keys=True)) - - print(json.dumps(row, indent=2, sort_keys=True)) - - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/src/lerobot/scripts/lerobot_train.py b/src/lerobot/scripts/lerobot_train.py index d510f603b..8d1b10430 100644 --- a/src/lerobot/scripts/lerobot_train.py +++ b/src/lerobot/scripts/lerobot_train.py @@ -49,7 +49,7 @@ from lerobot.optim.factory import make_optimizer_and_scheduler from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors from lerobot.utils.import_utils import register_third_party_plugins from lerobot.utils.logging_utils import AverageMeter, MetricsTracker -from lerobot.utils.profiling_utils import TrainingProfiler +from lerobot.utils.model_profiling import TrainingProfiler from lerobot.utils.random_utils import set_seed from lerobot.utils.utils import ( cycle, diff --git a/src/lerobot/utils/model_profiling.py b/src/lerobot/utils/model_profiling.py new file mode 100644 index 000000000..885a2ecf0 --- /dev/null +++ b/src/lerobot/utils/model_profiling.py @@ -0,0 +1,763 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +"""Model profiling — single-file entry point. + +Contains three things that used to live in three separate files: + +* `TrainingProfiler` — hooks the training loop. Captures per-step + forward/backward/optimizer timings, the torch profiler output, and a + deterministic-forward fingerprint for regression detection. +* `POLICY_SPECS` — CI matrix of `policy_name → (steps, train_args)`. + Inline so there is no separate JSON to keep in sync. +* `main()` — CI orchestrator. For each selected policy, spawns a + `lerobot-train` subprocess with profiling enabled, collects the + artifacts, and (optionally) publishes a row to a HF Hub dataset. + +Usage (CI): + + python -m lerobot.utils.model_profiling \ + --output_dir=./profiling-results \ + --policies act diffusion \ + --profile_mode=trace \ + --publish +""" + +from __future__ import annotations + +import argparse +import contextlib +import hashlib +import json +import logging +import re +import shutil +import statistics +import subprocess +import time +from collections.abc import Iterator +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import UTC, datetime +from numbers import Real +from pathlib import Path +from typing import Any + +import torch +from huggingface_hub import CommitOperationAdd, HfApi +from huggingface_hub.errors import HfHubHTTPError +from torch.utils.data._utils.collate import default_collate + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Policy matrix. Same shape as the former JSON file; inlined so the source +# tree has one less file to keep in sync with the training args. +# --------------------------------------------------------------------------- + +_LIBERO_RENAME_BASE_RGB = ( + '--rename_map={"observation.images.front": "observation.images.base_0_rgb", ' + '"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' +) +_LIBERO_RENAME_CAMERAS = ( + '--rename_map={"observation.images.front": "observation.images.camera1", ' + '"observation.images.wrist": "observation.images.camera2"}' +) +_PI_SGD = [ + "--use_policy_training_preset=false", + "--optimizer.type=sgd", + "--optimizer.lr=1e-5", + "--optimizer.weight_decay=0", + "--optimizer.grad_clip_norm=1.0", + "--scheduler.type=cosine_decay_with_warmup", + "--scheduler.peak_lr=1e-5", + "--scheduler.decay_lr=1e-6", + "--scheduler.num_warmup_steps=0", + "--scheduler.num_decay_steps=12", +] + + +POLICY_SPECS: dict[str, dict[str, Any]] = { + "act": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/pusht", + "--dataset.episodes=[0]", + "--policy.type=act", + "--policy.device=cuda", + "--batch_size=4", + "--cudnn_deterministic=true", + ], + }, + "diffusion": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/pusht", + "--dataset.episodes=[0]", + "--policy.type=diffusion", + "--policy.device=cuda", + "--batch_size=4", + "--cudnn_deterministic=true", + ], + }, + "groot": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.type=groot", + "--policy.base_model_path=nvidia/GR00T-N1.5-3B", + "--policy.tune_diffusion_model=true", + "--policy.tune_projector=true", + "--policy.tune_llm=false", + "--policy.tune_visual=false", + "--policy.use_bf16=true", + "--policy.device=cuda", + "--batch_size=1", + '--rename_map={"observation.images.image": "observation.images.camera1", ' + '"observation.images.image2": "observation.images.camera2"}', + ], + }, + "multi_task_dit": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/pusht", + "--dataset.episodes=[0]", + "--policy.type=multi_task_dit", + "--policy.device=cuda", + "--policy.horizon=32", + "--policy.n_action_steps=30", + "--batch_size=4", + "--cudnn_deterministic=true", + ], + }, + "pi0": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.path=lerobot/pi0_base", + "--policy.device=cuda", + "--policy.dtype=bfloat16", + "--policy.n_action_steps=30", + "--policy.use_amp=true", + "--policy.gradient_checkpointing=true", + "--batch_size=1", + *_PI_SGD, + _LIBERO_RENAME_BASE_RGB, + ], + }, + "pi0_fast": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.path=lerobot/pi0fast-base", + "--policy.device=cuda", + "--policy.dtype=bfloat16", + "--policy.n_action_steps=30", + "--policy.use_amp=true", + "--policy.gradient_checkpointing=true", + "--batch_size=1", + *_PI_SGD, + _LIBERO_RENAME_BASE_RGB, + ], + }, + "pi05": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.path=lerobot/pi05_base", + "--policy.device=cuda", + "--policy.dtype=bfloat16", + "--policy.n_action_steps=30", + "--policy.use_amp=true", + "--policy.gradient_checkpointing=true", + "--batch_size=1", + *_PI_SGD, + '--policy.normalization_mapping={"ACTION": "MEAN_STD", ' + '"STATE": "MEAN_STD", "VISUAL": "IDENTITY"}', + _LIBERO_RENAME_BASE_RGB, + ], + }, + "smolvla": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.path=lerobot/smolvla_base", + "--policy.load_vlm_weights=true", + "--policy.freeze_vision_encoder=false", + "--policy.train_expert_only=false", + "--policy.empty_cameras=1", + "--policy.device=cuda", + "--batch_size=1", + _LIBERO_RENAME_CAMERAS, + ], + }, + "wall_x": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/aloha_sim_insertion_human", + "--dataset.episodes=[0]", + "--policy.type=wall_x", + "--policy.pretrained_name_or_path=x-square-robot/wall-oss-flow", + "--policy.prediction_mode=diffusion", + "--policy.attn_implementation=eager", + "--policy.device=cuda", + "--batch_size=1", + *_PI_SGD, + ], + }, + "xvla": { + "steps": 12, + "train_args": [ + "--dataset.repo_id=lerobot/libero_plus", + "--dataset.episodes=[0]", + "--policy.path=lerobot/xvla-widowx", + "--policy.action_mode=auto", + "--policy.empty_cameras=1", + "--policy.device=cuda", + "--batch_size=1", + '--rename_map={"observation.images.front": "observation.images.image", ' + '"observation.images.wrist": "observation.images.image2"}', + ], + }, +} + + +# --------------------------------------------------------------------------- +# TrainingProfiler — hooks the training loop. +# --------------------------------------------------------------------------- + + +def _stable_float(value: float | int | None) -> float | None: + return None if value is None else round(float(value), 8) + + +def _as_float(value: Any) -> float: + if isinstance(value, Real): + return float(value) + if hasattr(value, "val"): + return float(value.val) + raise TypeError(f"Expected a real-valued metric, got {type(value).__name__}") + + +def _summary(values: list[float]) -> dict[str, float | int | None]: + if not values: + return {"count": 0, "mean": None, "median": None, "min": None, "max": None} + return { + "count": len(values), + "mean": statistics.fmean(values), + "median": statistics.median(values), + "min": min(values), + "max": max(values), + } + + +def _tensor_signature(tensor: torch.Tensor) -> dict[str, Any]: + """Small, stable summary of a tensor so forward-pass outputs can be + compared across runs without bloating the regression JSON.""" + cpu = tensor.detach().cpu() + hash_tensor = cpu.float() if cpu.dtype == torch.bfloat16 else cpu + sig: dict[str, Any] = { + "shape": list(cpu.shape), + "dtype": str(cpu.dtype), + "numel": cpu.numel(), + "sha256": hashlib.sha256(hash_tensor.contiguous().numpy().tobytes()).hexdigest(), + } + if cpu.numel(): + promoted = cpu.to(torch.float64) if cpu.is_floating_point() else cpu.to(torch.int64) + sig["sum"] = _stable_float(promoted.sum().item()) + sig["mean"] = _stable_float(promoted.float().mean().item()) + return sig + + +def _summarize_value(value: Any) -> Any: + if isinstance(value, torch.Tensor): + return _tensor_signature(value) + if isinstance(value, dict): + return {k: _summarize_value(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_summarize_value(v) for v in value] + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return repr(value) + + +def _hash_payload(payload: Any) -> str: + return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() + + +def _get_profiler_device_time_us(event: Any) -> float | None: + return _stable_float( + getattr(event, "self_device_time_total", getattr(event, "self_cuda_time_total", None)) + ) + + +def _write_profiler_table(profiler: Any, path: Path, *, sort_by: str, row_limit: int = 40) -> None: + # The profiler may not have recorded any events for this sort key when the + # schedule window lands outside the active steps — skip silently rather + # than crashing the whole artifact-writer pass. + with contextlib.suppress(Exception): + path.write_text(profiler.key_averages().table(sort_by=sort_by, row_limit=row_limit)) + + +def write_deterministic_forward_artifacts( + *, + policy: Any, + dataset: Any, + batch_size: int, + preprocessor: Any, + output_dir: Path, + device_type: str, +) -> None: + """Run a seed-controlled single forward pass and dump a stable fingerprint + (loss/output tensor hashes + op counts) for regression detection. Keeps + the caller-selected module mode so ACT-with-VAE-style policies that only + materialize their full forward outputs in `train()` still match.""" + if len(dataset) == 0: + raise ValueError("Cannot build a reference batch from an empty dataset.") + indices = [i % len(dataset) for i in range(batch_size)] + reference_batch = preprocessor(default_collate([dataset[i] for i in indices])) + + activities = [torch.profiler.ProfilerActivity.CPU] + if device_type == "cuda": + activities.append(torch.profiler.ProfilerActivity.CUDA) + + with torch.random.fork_rng(devices=[] if device_type != "cuda" else None): + torch.manual_seed(0) + if device_type == "cuda": + torch.cuda.manual_seed_all(0) + with torch.no_grad(), torch.profiler.profile(activities=activities) as prof: + loss, output_dict = policy.forward(reference_batch) + + operators = sorted( + ( + { + "key": e.key, + "count": e.count, + "cpu_time_total_us": _stable_float(getattr(e, "cpu_time_total", None)), + **( + {"self_cuda_time_total_us": _get_profiler_device_time_us(e)} + if device_type == "cuda" + else {} + ), + } + for e in prof.key_averages() + ), + key=lambda e: e["key"], + ) + outputs = {"loss": _summarize_value(loss), "output_dict": _summarize_value(output_dict)} + payload = { + "seed": 0, + "reference_batch_size": batch_size, + "operator_fingerprint": _hash_payload([(o["key"], o["count"]) for o in operators]), + "output_fingerprint": _hash_payload(outputs), + "operators": operators, + "outputs": outputs, + } + output_dir.mkdir(parents=True, exist_ok=True) + (output_dir / "deterministic_forward.json").write_text(json.dumps(payload, indent=2, sort_keys=True)) + sort_by = "self_cuda_time_total" if device_type == "cuda" else "cpu_time_total" + _write_profiler_table(prof, output_dir / "deterministic_forward_ops.txt", sort_by=sort_by) + + +class TrainingProfiler: + """Self-contained profiling hooks for the training loop. + + The training script interacts via ``start()``, ``section()``, ``step()``, + ``finalize()``, and (optionally) ``record_deterministic_forward()`` — a + ~7-line surface. + """ + + _SCHEDULE_WAIT = 1 + _SCHEDULE_WARMUP = 2 + _SCHEDULE_ACTIVE = 6 + + def __init__(self, mode: str, output_dir: Path, device: torch.device) -> None: + self._mode = mode + self._output_dir = output_dir + self._output_dir.mkdir(parents=True, exist_ok=True) + self._device = device + # Inline timing state — no separate collector class. + self._total_update_s: list[float] = [] + self._dataloading_s: list[float] = [] + self._section_s: dict[str, list[float]] = {} + self._memory: list[dict[str, int]] = [] + self._torch = self._build_torch_profiler() + logger.info("Profiling enabled. Artifacts will be written to %s", output_dir) + + def _build_torch_profiler(self) -> Any: + activities = [torch.profiler.ProfilerActivity.CPU] + if self._device.type == "cuda": + activities.append(torch.profiler.ProfilerActivity.CUDA) + trace_dir = self._output_dir / "torch_traces" + trace_dir.mkdir(parents=True, exist_ok=True) + + def _on_trace_ready(p: Any) -> None: + if self._mode == "trace": + p.export_chrome_trace(str(trace_dir / f"trace_step_{p.step_num}.json")) + + return torch.profiler.profile( + activities=activities, + schedule=torch.profiler.schedule( + wait=self._SCHEDULE_WAIT, + warmup=self._SCHEDULE_WARMUP, + active=self._SCHEDULE_ACTIVE, + repeat=1, + ), + on_trace_ready=_on_trace_ready, + record_shapes=True, + profile_memory=True, + with_flops=True, + ) + + @classmethod + def from_cfg(cls, cfg: Any, device: torch.device) -> TrainingProfiler: + output = cfg.profile_output_dir or (Path(cfg.output_dir) / "profiling") + return cls(mode=cfg.profile_mode, output_dir=Path(output), device=device) + + def record_deterministic_forward( + self, *, policy: Any, dataset: Any, batch_size: int, preprocessor: Any + ) -> None: + logger.info("Recording deterministic forward-pass artifacts") + write_deterministic_forward_artifacts( + policy=policy, + dataset=dataset, + batch_size=batch_size, + preprocessor=preprocessor, + output_dir=self._output_dir, + device_type=self._device.type, + ) + if self._device.type == "cuda": + torch.cuda.empty_cache() + + def start(self) -> None: + if self._device.type == "cuda": + torch.cuda.reset_peak_memory_stats(self._device) + self._torch.__enter__() + + @contextmanager + def section(self, name: str) -> Iterator[None]: + """Time a region of the training step. Syncs on CUDA so the + duration reflects GPU work, not just kernel-launch latency.""" + if self._device.type == "cuda": + torch.cuda.synchronize(self._device) + t0 = time.perf_counter() + try: + yield + finally: + if self._device.type == "cuda": + torch.cuda.synchronize(self._device) + self._section_s.setdefault(name, []).append(time.perf_counter() - t0) + + def step(self, step_num: int, train_tracker: Any) -> None: + self._total_update_s.append(_as_float(train_tracker.update_s)) + self._dataloading_s.append(_as_float(train_tracker.dataloading_s)) + if self._device.type == "cuda": + self._memory.append( + { + "step": step_num, + "allocated_bytes": torch.cuda.memory_allocated(self._device), + "reserved_bytes": torch.cuda.memory_reserved(self._device), + } + ) + self._torch.step() + + def finalize(self) -> None: + self._torch.__exit__(None, None, None) + payload: dict[str, Any] = { + "profile_mode": self._mode, + "total_update_s": _summary(self._total_update_s), + "dataloading_s": _summary(self._dataloading_s), + "memory_timeline": self._memory, + } + for name, values in self._section_s.items(): + payload[f"{name}_s"] = _summary(values) + if self._device.type == "cuda": + payload["peak_memory_allocated_bytes"] = torch.cuda.max_memory_allocated(self._device) + payload["peak_memory_reserved_bytes"] = torch.cuda.max_memory_reserved(self._device) + (self._output_dir / "step_timing_summary.json").write_text( + json.dumps(payload, indent=2, sort_keys=True) + ) + + tables_dir = self._output_dir / "torch_tables" + tables_dir.mkdir(parents=True, exist_ok=True) + _write_profiler_table(self._torch, tables_dir / "cpu_time_total.txt", sort_by="cpu_time_total") + _write_profiler_table(self._torch, tables_dir / "cpu_memory.txt", sort_by="self_cpu_memory_usage") + _write_profiler_table(self._torch, tables_dir / "flops.txt", sort_by="flops") + if self._device.type == "cuda": + _write_profiler_table( + self._torch, tables_dir / "cuda_time_total.txt", sort_by="self_cuda_time_total" + ) + _write_profiler_table( + self._torch, tables_dir / "cuda_memory.txt", sort_by="self_cuda_memory_usage" + ) + + +# --------------------------------------------------------------------------- +# CI orchestrator. Spawns `lerobot-train` per policy, collects the +# artifacts, (optionally) uploads to the HF Hub results dataset. +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class UploadTarget: + local_path: Path + path_in_repo: str + + +@dataclass(frozen=True) +class UploadResult: + uploaded_paths: dict[str, str] + pr_url: str | None = None + + +def _utc_timestamp_slug(now: datetime | None = None) -> str: + return (now or datetime.now(UTC)).strftime("%Y%m%dT%H%M%SZ") + + +def _hub_file_url(repo_id: str, path_in_repo: str, *, revision: str = "main") -> str: + return f"https://huggingface.co/datasets/{repo_id}/resolve/{revision}/{path_in_repo}" + + +def parse_discussion_num(pr_url: str | None) -> int | None: + if not pr_url: + return None + m = re.search(r"/discussions/(\d+)$", pr_url) + return int(m.group(1)) if m else None + + +def upload_targets( + repo_id: str, + targets: list[UploadTarget], + *, + token: str | None = None, + commit_message: str | None = None, + create_pr: bool = False, +) -> UploadResult: + api = HfApi(token=token) + commit = api.create_commit( + repo_id=repo_id, + repo_type="dataset", + operations=[ + CommitOperationAdd(path_in_repo=t.path_in_repo, path_or_fileobj=str(t.local_path)) + for t in targets + ], + commit_message=commit_message or f"Upload {len(targets)} profiling artifacts", + revision="main", + create_pr=create_pr, + ) + pr_num = parse_discussion_num(commit.pr_url) + revision = f"refs/pr/{pr_num}" if (create_pr and pr_num) else "main" + return UploadResult( + uploaded_paths={ + t.path_in_repo: _hub_file_url(repo_id, t.path_in_repo, revision=revision) for t in targets + }, + pr_url=commit.pr_url, + ) + + +def build_train_command(policy: str, run_dir: Path, profile_mode: str) -> list[str]: + spec = POLICY_SPECS[policy] + return [ + "uv", + "run", + "lerobot-train", + *spec["train_args"], + f"--output_dir={run_dir / 'train'}", + f"--steps={spec['steps']}", + "--eval_freq=0", + "--save_checkpoint=false", + f"--save_freq={spec['steps']}", + "--wandb.enable=false", + "--policy.push_to_hub=false", + "--num_workers=0", + "--log_freq=1", + f"--profile_mode={profile_mode}", + f"--profile_output_dir={run_dir / 'profiling'}", + ] + + +def build_artifact_index( + *, repo_id: str, run_dir: Path, policy_name: str, run_id: str +) -> tuple[dict[str, Any], dict[str, Any], list[UploadTarget], str]: + """Scan the run directory and categorize files into + (stdout/stderr, torch_tables/*, torch_traces/*, everything else under profiling/). + Returns (paths, urls, upload targets, row path in repo).""" + row_path_in_repo = f"rows/{policy_name}/{run_id}.json" + root = f"artifacts/{policy_name}/{run_id}" + paths: dict[str, Any] = { + "row": row_path_in_repo, + "profiling_files": {}, + "torch_tables": {}, + "trace_files": {}, + } + urls: dict[str, Any] = { + "row": _hub_file_url(repo_id, row_path_in_repo), + "profiling_files": {}, + "torch_tables": {}, + "trace_files": {}, + } + targets: list[UploadTarget] = [] + + for name in ("stdout.txt", "stderr.txt"): + p = run_dir / name + if p.exists(): + key = name.removesuffix(".txt") + repo = f"{root}/{name}" + paths[key] = repo + urls[key] = _hub_file_url(repo_id, repo) + targets.append(UploadTarget(p, repo)) + + profiling_dir = run_dir / "profiling" + if profiling_dir.exists(): + for p in sorted(profiling_dir.rglob("*")): + if not p.is_file(): + continue + rel = str(p.relative_to(run_dir)) + repo = f"{root}/{rel}" + paths["profiling_files"][rel] = repo + urls["profiling_files"][rel] = _hub_file_url(repo_id, repo) + targets.append(UploadTarget(p, repo)) + if p.name == "step_timing_summary.json": + paths["step_timing_summary"] = repo + urls["step_timing_summary"] = _hub_file_url(repo_id, repo) + elif "torch_tables" in p.parts: + paths["torch_tables"][p.name] = repo + urls["torch_tables"][p.name] = _hub_file_url(repo_id, repo) + elif "torch_traces" in p.parts: + paths["trace_files"][p.name] = repo + urls["trace_files"][p.name] = _hub_file_url(repo_id, repo) + + return paths, urls, targets, row_path_in_repo + + +def upload_profile_run( + *, + repo_id: str, + row_path: Path, + row_path_in_repo: str, + artifact_targets: list[UploadTarget], + create_pr: bool = False, +) -> UploadResult: + return upload_targets( + repo_id=repo_id, + targets=[*artifact_targets, UploadTarget(row_path, row_path_in_repo)], + commit_message=f"Add model profiling row {row_path_in_repo}", + create_pr=create_pr, + ) + + +def _load_json(path: Path) -> dict[str, Any]: + return json.loads(path.read_text()) if path.exists() else {} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--policies", nargs="*", default=None) + parser.add_argument("--output_dir", type=Path, required=True) + parser.add_argument("--hub_org", default="lerobot") + parser.add_argument("--results_repo", default="model-profiling-history") + parser.add_argument("--publish", action="store_true") + parser.add_argument("--profile_mode", choices=["summary", "trace"], default="trace") + parser.add_argument("--git_commit", default="") + parser.add_argument("--git_ref", default="") + parser.add_argument("--pr_number", default="") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + selected = args.policies or list(POLICY_SPECS) + unknown = sorted(set(selected) - set(POLICY_SPECS)) + if unknown: + raise ValueError(f"Unknown profiling policies: {', '.join(unknown)}") + + args.output_dir.mkdir(parents=True, exist_ok=True) + repo_id = args.results_repo if "/" in args.results_repo else f"{args.hub_org}/{args.results_repo}" + git_exe = shutil.which("git") or (_ for _ in ()).throw(RuntimeError("git not found in PATH")) + git_commit = args.git_commit or subprocess.check_output([git_exe, "rev-parse", "HEAD"], text=True).strip() + pr_number = int(args.pr_number) if str(args.pr_number).strip() else None + + for policy in selected: + run_id = f"{_utc_timestamp_slug()}__{policy}" + run_dir = args.output_dir / policy / run_id + run_dir.mkdir(parents=True, exist_ok=True) + cmd = build_train_command(policy, run_dir, args.profile_mode) + + t0 = time.perf_counter() + result = subprocess.run(cmd, capture_output=True, text=True) + wall_s = time.perf_counter() - t0 + + (run_dir / "stdout.txt").write_text(result.stdout) + (run_dir / "stderr.txt").write_text(result.stderr) + + paths, urls, upload_list, row_in_repo = build_artifact_index( + repo_id=repo_id, run_dir=run_dir, policy_name=policy, run_id=run_id + ) + row: dict[str, Any] = { + "schema_version": 1, + "created_at": datetime.now(UTC).isoformat(), + "run_id": run_id, + "policy": policy, + "git_commit": git_commit, + "git_ref": args.git_ref or None, + "pr_number": pr_number, + "status": "success" if result.returncode == 0 else "failed", + "return_code": result.returncode, + "profile_mode": args.profile_mode, + "wall_time_s": wall_s, + "spec": { + "steps": POLICY_SPECS[policy]["steps"], + "train_args": POLICY_SPECS[policy]["train_args"], + }, + "step_timing_summary": _load_json(run_dir / "profiling" / "step_timing_summary.json"), + "deterministic_forward": _load_json(run_dir / "profiling" / "deterministic_forward.json"), + "artifact_paths": paths, + "artifact_urls": urls, + "stderr_tail": result.stderr.splitlines()[-20:], + } + + row_path = run_dir / "profiling_row.json" + row_path.write_text(json.dumps(row, indent=2, sort_keys=True)) + + if args.publish: + try: + uploaded = upload_profile_run( + repo_id=repo_id, + row_path=row_path, + row_path_in_repo=row_in_repo, + artifact_targets=upload_list, + create_pr=pr_number is not None, + ) + except HfHubHTTPError as exc: + row.update({"publish_status": "failed", "publish_error": str(exc)}) + else: + row.update( + { + "publish_status": "success", + "uploaded_paths": uploaded.uploaded_paths, + "publish_pr_url": uploaded.pr_url, + "publish_pr_number": parse_discussion_num(uploaded.pr_url), + } + ) + row_path.write_text(json.dumps(row, indent=2, sort_keys=True)) + + print(json.dumps(row, indent=2, sort_keys=True)) + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/lerobot/utils/profiling_utils.py b/src/lerobot/utils/profiling_utils.py deleted file mode 100644 index 67e02b712..000000000 --- a/src/lerobot/utils/profiling_utils.py +++ /dev/null @@ -1,399 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import hashlib -import json -import logging -import statistics -import time -from collections.abc import Iterator -from contextlib import contextmanager -from dataclasses import dataclass, field -from numbers import Real -from pathlib import Path -from typing import Any - -import torch -from torch.utils.data._utils.collate import default_collate - - -def ensure_dir(path: Path) -> Path: - path.mkdir(parents=True, exist_ok=True) - return path - - -def write_profiler_table( - profiler: Any, - output_path: Path, - *, - sort_by: str, - row_limit: int = 40, -) -> None: - try: - table = profiler.key_averages().table(sort_by=sort_by, row_limit=row_limit) - except Exception: - return - output_path.write_text(table) - - -def _make_torch_profiler( - *, - mode: str, - output_dir: Path, - device_type: str, - wait_steps: int = 1, - warmup_steps: int = 2, - active_steps: int = 6, - repeat: int = 1, - record_shapes: bool = True, - with_memory: bool = True, - with_flops: bool = True, - with_stack: bool = False, -) -> Any: - activities = [torch.profiler.ProfilerActivity.CPU] - if device_type == "cuda": - activities.append(torch.profiler.ProfilerActivity.CUDA) - - trace_dir = ensure_dir(output_dir / "torch_traces") - - def _trace_ready(profiler: Any) -> None: - if mode != "trace": - return - profiler.export_chrome_trace(str(trace_dir / f"trace_step_{profiler.step_num}.json")) - - return torch.profiler.profile( - activities=activities, - schedule=torch.profiler.schedule( - wait=wait_steps, - warmup=warmup_steps, - active=active_steps, - repeat=repeat, - ), - on_trace_ready=_trace_ready, - record_shapes=record_shapes, - profile_memory=with_memory, - with_flops=with_flops, - with_stack=with_stack, - ) - - -def write_torch_profiler_outputs( - profiler: Any, - output_dir: Path, - *, - device_type: str, -) -> None: - tables_dir = ensure_dir(output_dir / "torch_tables") - write_profiler_table(profiler, tables_dir / "cpu_time_total.txt", sort_by="cpu_time_total") - if device_type == "cuda": - write_profiler_table(profiler, tables_dir / "cuda_time_total.txt", sort_by="self_cuda_time_total") - write_profiler_table(profiler, tables_dir / "cuda_memory.txt", sort_by="self_cuda_memory_usage") - write_profiler_table(profiler, tables_dir / "cpu_memory.txt", sort_by="self_cpu_memory_usage") - write_profiler_table(profiler, tables_dir / "flops.txt", sort_by="flops") - - -def _stable_float(value: float | int | None) -> float | None: - if value is None: - return None - return round(float(value), 8) - - -def _tensor_signature(tensor: torch.Tensor) -> dict[str, Any]: - cpu_tensor = tensor.detach().cpu() - if cpu_tensor.numel() == 0: - stats = {"sum": None, "mean": None, "std": None, "min": None, "max": None} - else: - stats_tensor = ( - cpu_tensor.to(torch.float64) if cpu_tensor.is_floating_point() else cpu_tensor.to(torch.int64) - ) - stats = { - "sum": _stable_float(stats_tensor.sum().item()), - "mean": _stable_float(stats_tensor.float().mean().item()), - "std": _stable_float(stats_tensor.float().std(unbiased=False).item()) - if cpu_tensor.numel() > 1 - else 0.0, - "min": _stable_float(stats_tensor.min().item()), - "max": _stable_float(stats_tensor.max().item()), - } - hash_tensor = cpu_tensor.float() if cpu_tensor.dtype == torch.bfloat16 else cpu_tensor - digest = hashlib.sha256(hash_tensor.contiguous().numpy().tobytes()).hexdigest() - return { - "shape": list(cpu_tensor.shape), - "dtype": str(cpu_tensor.dtype), - "numel": cpu_tensor.numel(), - "sha256": digest, - **stats, - } - - -def _summarize_forward_value(value: Any) -> Any: - if isinstance(value, torch.Tensor): - return _tensor_signature(value) - if isinstance(value, dict): - return {key: _summarize_forward_value(val) for key, val in value.items()} - if isinstance(value, (list, tuple)): - return [_summarize_forward_value(item) for item in value] - if isinstance(value, (str, int, float, bool)) or value is None: - return value - return repr(value) - - -def _hash_payload(payload: Any) -> str: - return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() - - -def _get_profiler_device_time_us(event: Any) -> float | None: - return _stable_float( - getattr(event, "self_device_time_total", getattr(event, "self_cuda_time_total", None)) - ) - - -def _build_reference_batch(dataset: Any, batch_size: int) -> Any: - if len(dataset) == 0: - raise ValueError("Cannot build a reference batch from an empty dataset.") - indices = [idx % len(dataset) for idx in range(batch_size)] - samples = [dataset[idx] for idx in indices] - return default_collate(samples) - - -def write_deterministic_forward_artifacts( - *, - policy: Any, - dataset: Any, - batch_size: int, - preprocessor: Any, - output_dir: Path, - device_type: str, -) -> None: - reference_batch = preprocessor(_build_reference_batch(dataset, batch_size)) - activities = [torch.profiler.ProfilerActivity.CPU] - if device_type == "cuda": - activities.append(torch.profiler.ProfilerActivity.CUDA) - - # Keep the caller-selected module mode so the fingerprint matches the actual - # train-path forward used by the policy. Some policies, such as ACT with VAE, - # only materialize their full forward outputs while in training mode. - with torch.random.fork_rng(devices=[] if device_type != "cuda" else None): - torch.manual_seed(0) - if device_type == "cuda": - torch.cuda.manual_seed_all(0) - with torch.no_grad(), torch.profiler.profile(activities=activities) as profiler: - loss, output_dict = policy.forward(reference_batch) - - operator_entries = [] - for event in profiler.key_averages(): - entry = { - "key": event.key, - "count": event.count, - "cpu_time_total_us": _stable_float(getattr(event, "cpu_time_total", None)), - } - if device_type == "cuda": - entry["self_cuda_time_total_us"] = _get_profiler_device_time_us(event) - operator_entries.append(entry) - operator_entries = sorted(operator_entries, key=lambda item: item["key"]) - - output_summary = { - "loss": _summarize_forward_value(loss), - "output_dict": _summarize_forward_value(output_dict), - } - payload = { - "seed": 0, - "reference_batch_size": batch_size, - "operator_fingerprint": _hash_payload([(entry["key"], entry["count"]) for entry in operator_entries]), - "output_fingerprint": _hash_payload(output_summary), - "operators": operator_entries, - "outputs": output_summary, - } - (output_dir / "deterministic_forward.json").write_text(json.dumps(payload, indent=2, sort_keys=True)) - table_sort = "self_cuda_time_total" if device_type == "cuda" else "cpu_time_total" - write_profiler_table(profiler, output_dir / "deterministic_forward_ops.txt", sort_by=table_sort) - - -def _summary(values: list[float]) -> dict[str, float] | dict[str, None]: - if not values: - return {"count": 0, "mean": None, "median": None, "min": None, "max": None} - return { - "count": len(values), - "mean": statistics.fmean(values), - "median": statistics.median(values), - "min": min(values), - "max": max(values), - } - - -def _as_float(value: Any) -> float: - if isinstance(value, Real): - return float(value) - if hasattr(value, "val"): - return float(value.val) - raise TypeError(f"Expected a real-valued metric, got {type(value).__name__}") - - -@dataclass -class _StepTimingCollector: - total_update_s: list[float] = field(default_factory=list) - dataloading_s: list[float] = field(default_factory=list) - section_s: dict[str, list[float]] = field(default_factory=dict) - memory_timeline: list[dict[str, float | int]] = field(default_factory=list) - - def record_step(self, total_update_s: float) -> None: - self.total_update_s.append(_as_float(total_update_s)) - - def record_dataloading(self, dataloading_s: float) -> None: - self.dataloading_s.append(_as_float(dataloading_s)) - - def record_section(self, name: str, duration_s: float) -> None: - self.section_s.setdefault(name, []).append(_as_float(duration_s)) - - def record_memory(self, *, step: int, allocated_bytes: int, reserved_bytes: int) -> None: - self.memory_timeline.append( - { - "step": step, - "allocated_bytes": allocated_bytes, - "reserved_bytes": reserved_bytes, - } - ) - - def to_dict(self) -> dict[str, Any]: - payload: dict[str, Any] = { - "total_update_s": _summary(self.total_update_s), - "dataloading_s": _summary(self.dataloading_s), - "memory_timeline": self.memory_timeline, - } - for name, values in self.section_s.items(): - payload[f"{name}_s"] = _summary(values) - return payload - - def write_json(self, output_path: Path, extra: dict[str, Any] | None = None) -> None: - payload = self.to_dict() - if extra: - payload.update(extra) - output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_text(json.dumps(payload, indent=2, sort_keys=True)) - - -class TrainingProfiler: - """Self-contained profiling orchestrator for the training loop. - - Encapsulates torch profiler setup, step-level timing collection, deterministic - forward-pass artifact recording, and all output writing. The training script - interacts with it through a thin interface (~7 lines). - """ - - def __init__( - self, - mode: str, - output_dir: Path, - device: torch.device, - *, - wait_steps: int = 1, - warmup_steps: int = 2, - active_steps: int = 6, - repeat: int = 1, - record_shapes: bool = True, - with_memory: bool = True, - with_flops: bool = True, - with_stack: bool = False, - ) -> None: - self._mode = mode - self._output_dir = ensure_dir(output_dir) - self._device = device - self._timing = _StepTimingCollector() - self._torch_profiler = _make_torch_profiler( - mode=mode, - output_dir=output_dir, - device_type=device.type, - wait_steps=wait_steps, - warmup_steps=warmup_steps, - active_steps=active_steps, - repeat=repeat, - record_shapes=record_shapes, - with_memory=with_memory, - with_flops=with_flops, - with_stack=with_stack, - ) - logging.info("Profiling enabled. Artifacts will be written to %s", output_dir) - - @classmethod - def from_cfg(cls, cfg: Any, device: torch.device) -> TrainingProfiler: - output_dir = cfg.profile_output_dir - if output_dir is None: - output_dir = Path(cfg.output_dir) / "profiling" - return cls(mode=cfg.profile_mode, output_dir=Path(output_dir), device=device) - - def record_deterministic_forward( - self, - *, - policy: Any, - dataset: Any, - batch_size: int, - preprocessor: Any, - ) -> None: - logging.info("Recording deterministic forward-pass artifacts") - write_deterministic_forward_artifacts( - policy=policy, - dataset=dataset, - batch_size=batch_size, - preprocessor=preprocessor, - output_dir=self._output_dir, - device_type=self._device.type, - ) - if self._device.type == "cuda": - torch.cuda.empty_cache() - - def start(self) -> None: - if self._device.type == "cuda": - torch.cuda.reset_peak_memory_stats(self._device) - self._torch_profiler.__enter__() - - @contextmanager - def section(self, name: str) -> Iterator[None]: - """Time a region of the training step (e.g. forward/backward/optimizer). - - On CUDA we synchronize before and after so the reported duration - reflects GPU work, not just the CPU-side kernel-launch latency. - """ - if self._device.type == "cuda": - torch.cuda.synchronize(self._device) - start = time.perf_counter() - try: - yield - finally: - if self._device.type == "cuda": - torch.cuda.synchronize(self._device) - self._timing.record_section(name, time.perf_counter() - start) - - def step(self, step_num: int, train_tracker: Any) -> None: - self._timing.record_step(_as_float(train_tracker.update_s)) - self._timing.record_dataloading(_as_float(train_tracker.dataloading_s)) - if self._device.type == "cuda": - self._timing.record_memory( - step=step_num, - allocated_bytes=torch.cuda.memory_allocated(self._device), - reserved_bytes=torch.cuda.memory_reserved(self._device), - ) - self._torch_profiler.step() - - def finalize(self) -> None: - self._torch_profiler.__exit__(None, None, None) - extra: dict[str, Any] = {"profile_mode": self._mode} - if self._device.type == "cuda": - extra["peak_memory_allocated_bytes"] = torch.cuda.max_memory_allocated(self._device) - extra["peak_memory_reserved_bytes"] = torch.cuda.max_memory_reserved(self._device) - self._timing.write_json(self._output_dir / "step_timing_summary.json", extra=extra) - write_torch_profiler_outputs(self._torch_profiler, self._output_dir, device_type=self._device.type) diff --git a/tests/scripts/test_model_profiling.py b/tests/scripts/test_model_profiling.py deleted file mode 100644 index d3e2de58b..000000000 --- a/tests/scripts/test_model_profiling.py +++ /dev/null @@ -1,423 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2026 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import annotations - -import argparse -import importlib.util -import json -import subprocess -import sys -from pathlib import Path - -import pytest -import torch -from huggingface_hub.errors import HfHubHTTPError - - -def _import_model_profiling_script(): - script_path = Path(__file__).resolve().parents[2] / "scripts" / "ci" / "run_model_profiling.py" - module_name = "tests.scripts.run_model_profiling" - spec = importlib.util.spec_from_file_location(module_name, script_path) - module = importlib.util.module_from_spec(spec) - sys.modules[module_name] = module - assert spec.loader is not None - spec.loader.exec_module(module) - return module - - -def test_profiling_specs_cover_expected_policies(): - module = _import_model_profiling_script() - spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json" - specs = module.load_specs(spec_path) - - assert set(specs) == { - "act", - "diffusion", - "groot", - "multi_task_dit", - "pi0", - "pi0_fast", - "pi05", - "smolvla", - "wall_x", - "xvla", - } - for excluded in ("sac", "sarm", "tdmpc", "vqbet", "reward_classifier"): - assert excluded not in specs - - -def test_pretrained_libero_specs_match_expected_camera_keys_and_normalization(): - module = _import_model_profiling_script() - spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json" - specs = module.load_specs(spec_path) - - assert ( - '--rename_map={"observation.images.front": "observation.images.base_0_rgb", ' - '"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi0"].train_args - ) - assert ( - '--rename_map={"observation.images.front": "observation.images.base_0_rgb", ' - '"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi0_fast"].train_args - ) - assert ( - '--rename_map={"observation.images.front": "observation.images.base_0_rgb", ' - '"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi05"].train_args - ) - assert ( - '--policy.normalization_mapping={"ACTION": "MEAN_STD", ' - '"STATE": "MEAN_STD", "VISUAL": "IDENTITY"}' in specs["pi05"].train_args - ) - assert ( - '--rename_map={"observation.images.front": "observation.images.camera1", ' - '"observation.images.wrist": "observation.images.camera2"}' in specs["smolvla"].train_args - ) - - -def test_build_train_command_includes_profiling_outputs(tmp_path): - module = _import_model_profiling_script() - spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json" - spec = module.load_specs(spec_path)["act"] - - cmd = module.build_train_command(spec, tmp_path / "run", "trace") - - assert cmd[:3] == ["uv", "run", "lerobot-train"] - assert any(arg.startswith("--output_dir=") for arg in cmd) - assert any(arg.startswith("--profile_output_dir=") for arg in cmd) - assert "--profile_mode=trace" in cmd - assert "--eval_freq=0" in cmd - - -def test_build_artifact_index_collects_tables_and_traces(tmp_path): - module = _import_model_profiling_script() - run_dir = tmp_path / "act" / "20260415T000000Z__act" - profiling_dir = run_dir / "profiling" - (profiling_dir / "torch_tables").mkdir(parents=True, exist_ok=True) - (profiling_dir / "torch_traces").mkdir(parents=True, exist_ok=True) - (profiling_dir / "step_timing_summary.json").write_text("{}") - (profiling_dir / "deterministic_forward.json").write_text( - json.dumps({"operator_fingerprint": "ops123", "output_fingerprint": "out123"}) - ) - (profiling_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu table") - (profiling_dir / "torch_traces" / "trace_step_9.json").write_text("{}") - (run_dir / "stdout.txt").write_text("stdout") - (run_dir / "stderr.txt").write_text("stderr") - - artifact_paths, artifact_urls, targets, row_path_in_repo = module.build_artifact_index( - repo_id="lerobot/model-profiling-history", - run_dir=run_dir, - policy_name="act", - run_id="20260415T000000Z__act", - ) - - assert row_path_in_repo == "rows/act/20260415T000000Z__act.json" - assert artifact_paths["stdout"].endswith("/stdout.txt") - assert artifact_paths["step_timing_summary"].endswith("/profiling/step_timing_summary.json") - assert "cpu_time_total.txt" in artifact_paths["torch_tables"] - assert "trace_step_9.json" in artifact_paths["trace_files"] - assert artifact_paths["profiling_files"]["profiling/deterministic_forward.json"].endswith( - "/profiling/deterministic_forward.json" - ) - assert artifact_urls["row"].startswith("https://huggingface.co/datasets/lerobot/model-profiling-history/") - assert len(targets) == 6 - - -def test_upload_targets_batches_preview_publish_into_single_hf_pr(monkeypatch, tmp_path): - module = _import_model_profiling_script() - local_path = tmp_path / "profiling_row.json" - local_path.write_text("{}") - captured: dict[str, object] = {} - - class _FakeCommit: - pr_url = "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42" - - class _FakeApi: - def __init__(self, token=None): - captured["token"] = token - - def create_commit(self, **kwargs): - captured.update(kwargs) - return _FakeCommit() - - monkeypatch.setattr(module, "HfApi", _FakeApi) - - result = module.upload_targets( - repo_id="lerobot/model-profiling-history", - targets=[module.UploadTarget(local_path=local_path, path_in_repo="rows/act/run.json")], - create_pr=True, - token="hf_test_token", - ) - - assert captured["repo_id"] == "lerobot/model-profiling-history" - assert captured["repo_type"] == "dataset" - assert captured["revision"] == "main" - assert captured["create_pr"] is True - operations = captured["operations"] - assert len(operations) == 1 - assert operations[0].path_in_repo == "rows/act/run.json" - assert result.pr_url == _FakeCommit.pr_url - assert result.uploaded_paths["rows/act/run.json"].endswith("/resolve/refs/pr/42/rows/act/run.json") - - -def test_model_profiling_main_smoke_writes_row(monkeypatch, tmp_path): - module = _import_model_profiling_script() - - spec_file = tmp_path / "specs.json" - spec_file.write_text( - json.dumps( - { - "act": { - "steps": 4, - "train_args": [ - "--dataset.repo_id=lerobot/pusht", - "--dataset.episodes=[0]", - "--policy.type=act", - "--policy.device=cuda", - "--batch_size=4", - ], - } - } - ) - ) - args = argparse.Namespace( - spec_file=spec_file, - policies=["act"], - output_dir=tmp_path / "results", - hub_org="lerobot", - results_repo="model-profiling-history", - publish=False, - profile_mode="summary", - git_commit="", - git_ref="codex/model-profiling", - pr_number="3389", - ) - - monkeypatch.setattr(module, "parse_args", lambda: args) - monkeypatch.setattr(module.subprocess, "check_output", lambda *a, **k: "deadbeef\n") - - def _fake_run(cmd, capture_output, text): - assert capture_output is True - assert text is True - profile_dir = Path( - next(arg.split("=", 1)[1] for arg in cmd if arg.startswith("--profile_output_dir=")) - ) - (profile_dir / "torch_tables").mkdir(parents=True, exist_ok=True) - (profile_dir / "step_timing_summary.json").write_text( - json.dumps( - { - "total_update_s": {"count": 1, "mean": 0.3, "median": 0.3, "min": 0.3, "max": 0.3}, - "peak_memory_allocated_bytes": 1024, - } - ) - ) - (profile_dir / "deterministic_forward.json").write_text( - json.dumps( - { - "operator_fingerprint": "ops-fingerprint", - "output_fingerprint": "output-fingerprint", - } - ) - ) - (profile_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu time table") - return subprocess.CompletedProcess(cmd, 0, "stdout ok", "") - - monkeypatch.setattr(module.subprocess, "run", _fake_run) - - assert module.main() == 0 - - row_paths = list((tmp_path / "results").rglob("profiling_row.json")) - assert len(row_paths) == 1 - row = json.loads(row_paths[0].read_text()) - assert row["policy"] == "act" - assert row["status"] == "success" - assert row["git_commit"] == "deadbeef" - assert row["git_ref"] == "codex/model-profiling" - assert row["pr_number"] == 3389 - assert row["step_timing_summary"]["total_update_s"]["mean"] == 0.3 - assert row["deterministic_forward"]["operator_fingerprint"] == "ops-fingerprint" - - -def test_model_profiling_publish_failure_is_recorded_without_failing(monkeypatch, tmp_path): - module = _import_model_profiling_script() - - spec_file = tmp_path / "specs.json" - spec_file.write_text( - json.dumps( - { - "act": { - "steps": 1, - "train_args": [ - "--dataset.repo_id=lerobot/pusht", - "--dataset.episodes=[0]", - "--policy.type=act", - "--policy.device=cuda", - "--batch_size=4", - ], - } - } - ) - ) - args = argparse.Namespace( - spec_file=spec_file, - policies=["act"], - output_dir=tmp_path / "results", - hub_org="lerobot", - results_repo="model-profiling-history", - publish=True, - profile_mode="summary", - git_commit="deadbeef", - git_ref="codex/model-profiling", - pr_number="3389", - ) - - monkeypatch.setattr(module, "parse_args", lambda: args) - - def _fake_run(cmd, capture_output, text): - profile_dir = Path( - next(arg.split("=", 1)[1] for arg in cmd if arg.startswith("--profile_output_dir=")) - ) - profile_dir.mkdir(parents=True, exist_ok=True) - return subprocess.CompletedProcess(cmd, 0, "stdout ok", "") - - monkeypatch.setattr(module.subprocess, "run", _fake_run) - - def _fake_upload_profile_run(**kwargs): - response = type("Response", (), {"status_code": 403, "headers": {}, "request": None})() - raise HfHubHTTPError("403 Forbidden: Authorization error.", response=response) - - monkeypatch.setattr(module, "upload_profile_run", _fake_upload_profile_run) - - assert module.main() == 0 - - row_paths = list((tmp_path / "results").rglob("profiling_row.json")) - assert len(row_paths) == 1 - row = json.loads(row_paths[0].read_text()) - assert row["status"] == "success" - assert row["publish_status"] == "failed" - assert "Authorization error" in row["publish_error"] - - -def test_parse_discussion_num_handles_hf_discussion_urls(): - module = _import_model_profiling_script() - - assert ( - module.parse_discussion_num( - "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42" - ) - == 42 - ) - assert ( - module.parse_discussion_num("https://huggingface.co/datasets/lerobot/model-profiling-history") is None - ) - - -def test_deterministic_forward_artifacts_preserve_policy_mode(tmp_path): - from lerobot.utils.profiling_utils import write_deterministic_forward_artifacts - - class _TrainingOnlyPolicy(torch.nn.Module): - def __init__(self): - super().__init__() - self.forward_calls = 0 - - def forward(self, batch): - self.forward_calls += 1 - assert self.training - return batch["value"].sum(), {"value": batch["value"]} - - dataset = [{"value": torch.tensor([1.0, 2.0])}] - policy = _TrainingOnlyPolicy() - policy.train() - - write_deterministic_forward_artifacts( - policy=policy, - dataset=dataset, - batch_size=2, - preprocessor=lambda batch: batch, - output_dir=tmp_path, - device_type="cpu", - ) - - payload = json.loads((tmp_path / "deterministic_forward.json").read_text()) - assert policy.training is True - assert policy.forward_calls == 1 - assert payload["reference_batch_size"] == 2 - assert "operator_fingerprint" in payload - assert payload["outputs"]["loss"]["numel"] == 1 - - -def test_step_timing_collector_accepts_metric_like_values(tmp_path): - from lerobot.utils.profiling_utils import _StepTimingCollector - - class _MetricLike: - def __init__(self, val): - self.val = val - - collector = _StepTimingCollector() - collector.record_step(_MetricLike(0.6)) - collector.record_dataloading(_MetricLike(0.05)) - collector.write_json(tmp_path / "step_timing_summary.json") - - payload = json.loads((tmp_path / "step_timing_summary.json").read_text()) - assert payload["total_update_s"]["mean"] == 0.6 - assert payload["dataloading_s"]["mean"] == 0.05 - - -def test_step_timing_collector_records_forward_backward_optimizer(tmp_path): - from lerobot.utils.profiling_utils import _StepTimingCollector - - collector = _StepTimingCollector() - for _ in range(3): - collector.record_section("forward", 0.10) - collector.record_section("backward", 0.20) - collector.record_section("optimizer", 0.05) - collector.write_json(tmp_path / "step_timing_summary.json") - - payload = json.loads((tmp_path / "step_timing_summary.json").read_text()) - assert payload["forward_s"]["mean"] == pytest.approx(0.10) - assert payload["backward_s"]["mean"] == pytest.approx(0.20) - assert payload["optimizer_s"]["mean"] == pytest.approx(0.05) - assert payload["forward_s"]["count"] == 3 - - -def test_training_profiler_section_records_duration(tmp_path): - from lerobot.utils.profiling_utils import TrainingProfiler - - profiler = TrainingProfiler( - mode="summary", - output_dir=tmp_path, - device=torch.device("cpu"), - ) - profiler.start() - with profiler.section("forward"): - pass - with profiler.section("backward"): - pass - profiler.step(1, argparse.Namespace(update_s=0.5, dataloading_s=0.01)) - profiler.finalize() - - payload = json.loads((tmp_path / "step_timing_summary.json").read_text()) - assert payload["forward_s"]["count"] == 1 - assert payload["backward_s"]["count"] == 1 - assert payload["forward_s"]["mean"] >= 0.0 - - -def test_profiler_device_time_uses_generic_attr_first(): - from lerobot.utils.profiling_utils import _get_profiler_device_time_us - - class _Event: - self_device_time_total = 12.3456 - - assert _get_profiler_device_time_us(_Event()) == 12.3456 diff --git a/tests/test_model_profiling.py b/tests/test_model_profiling.py new file mode 100644 index 000000000..9952bc64f --- /dev/null +++ b/tests/test_model_profiling.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python + +# Copyright 2026 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 + +from __future__ import annotations + +import argparse +import json +import subprocess +from pathlib import Path + +import pytest +import torch +from huggingface_hub.errors import HfHubHTTPError + +from lerobot.utils import model_profiling as mp + +# --------------------------------------------------------------------------- +# Policy spec matrix +# --------------------------------------------------------------------------- + + +def test_policy_specs_cover_expected_policies(): + assert set(mp.POLICY_SPECS) == { + "act", + "diffusion", + "groot", + "multi_task_dit", + "pi0", + "pi0_fast", + "pi05", + "smolvla", + "wall_x", + "xvla", + } + # Sanity: excluded policies should stay out of the matrix. + for excluded in ("sac", "sarm", "tdmpc", "vqbet", "reward_classifier"): + assert excluded not in mp.POLICY_SPECS + + +def test_pretrained_libero_specs_match_expected_camera_keys_and_normalization(): + base_rgb_rename = ( + '--rename_map={"observation.images.front": "observation.images.base_0_rgb", ' + '"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' + ) + for name in ("pi0", "pi0_fast", "pi05"): + assert base_rgb_rename in mp.POLICY_SPECS[name]["train_args"] + assert any( + arg.startswith('--policy.normalization_mapping={"ACTION": "MEAN_STD"') + for arg in mp.POLICY_SPECS["pi05"]["train_args"] + ) + assert ( + '--rename_map={"observation.images.front": "observation.images.camera1", ' + '"observation.images.wrist": "observation.images.camera2"}' + in mp.POLICY_SPECS["smolvla"]["train_args"] + ) + + +# --------------------------------------------------------------------------- +# CI orchestrator helpers +# --------------------------------------------------------------------------- + + +def test_build_train_command_includes_profiling_outputs(tmp_path): + cmd = mp.build_train_command("act", tmp_path / "run", "trace") + assert cmd[:3] == ["uv", "run", "lerobot-train"] + assert any(a.startswith("--output_dir=") for a in cmd) + assert any(a.startswith("--profile_output_dir=") for a in cmd) + assert "--profile_mode=trace" in cmd + assert "--eval_freq=0" in cmd + + +def test_build_artifact_index_collects_tables_and_traces(tmp_path): + run_dir = tmp_path / "act" / "20260415T000000Z__act" + profiling = run_dir / "profiling" + (profiling / "torch_tables").mkdir(parents=True) + (profiling / "torch_traces").mkdir(parents=True) + (profiling / "step_timing_summary.json").write_text("{}") + (profiling / "deterministic_forward.json").write_text( + json.dumps({"operator_fingerprint": "ops", "output_fingerprint": "out"}) + ) + (profiling / "torch_tables" / "cpu_time_total.txt").write_text("cpu table") + (profiling / "torch_traces" / "trace_step_9.json").write_text("{}") + (run_dir / "stdout.txt").write_text("stdout") + (run_dir / "stderr.txt").write_text("stderr") + + paths, urls, targets, row_in_repo = mp.build_artifact_index( + repo_id="lerobot/model-profiling-history", + run_dir=run_dir, + policy_name="act", + run_id="20260415T000000Z__act", + ) + + assert row_in_repo == "rows/act/20260415T000000Z__act.json" + assert paths["stdout"].endswith("/stdout.txt") + assert paths["step_timing_summary"].endswith("/profiling/step_timing_summary.json") + assert "cpu_time_total.txt" in paths["torch_tables"] + assert "trace_step_9.json" in paths["trace_files"] + assert urls["row"].startswith("https://huggingface.co/datasets/lerobot/model-profiling-history/") + # stdout + stderr + 4 profiling files + assert len(targets) == 6 + + +def test_upload_targets_batches_preview_publish_into_single_hf_pr(monkeypatch, tmp_path): + local_path = tmp_path / "profiling_row.json" + local_path.write_text("{}") + captured: dict[str, object] = {} + + class _FakeCommit: + pr_url = "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42" + + class _FakeApi: + def __init__(self, token=None): + captured["token"] = token + + def create_commit(self, **kwargs): + captured.update(kwargs) + return _FakeCommit() + + monkeypatch.setattr(mp, "HfApi", _FakeApi) + + result = mp.upload_targets( + repo_id="lerobot/model-profiling-history", + targets=[mp.UploadTarget(local_path, "rows/act/run.json")], + create_pr=True, + token="hf_test_token", + ) + + assert captured["repo_id"] == "lerobot/model-profiling-history" + assert captured["repo_type"] == "dataset" + assert captured["create_pr"] is True + assert result.pr_url == _FakeCommit.pr_url + assert result.uploaded_paths["rows/act/run.json"].endswith("/resolve/refs/pr/42/rows/act/run.json") + + +def test_parse_discussion_num_handles_hf_discussion_urls(): + assert ( + mp.parse_discussion_num( + "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42" + ) + == 42 + ) + assert mp.parse_discussion_num("https://huggingface.co/datasets/lerobot/model-profiling-history") is None + assert mp.parse_discussion_num(None) is None + + +# --------------------------------------------------------------------------- +# main() smoke tests +# --------------------------------------------------------------------------- + + +@pytest.fixture +def _fake_args(tmp_path): + """Shared argparse namespace for main() smoke tests — overridden per-test.""" + return argparse.Namespace( + policies=["act"], + output_dir=tmp_path / "results", + hub_org="lerobot", + results_repo="model-profiling-history", + publish=False, + profile_mode="summary", + git_commit="", + git_ref="codex/model-profiling", + pr_number="3389", + ) + + +def _stub_train_subprocess(mp_module, *, returncode: int = 0, write_artifacts: bool = True): + """Build a fake subprocess.run that writes the profiling artifacts main() expects.""" + + def _fake_run(cmd, capture_output, text): + assert capture_output is True + assert text is True + profile_dir = Path(next(a.split("=", 1)[1] for a in cmd if a.startswith("--profile_output_dir="))) + profile_dir.mkdir(parents=True, exist_ok=True) + if write_artifacts: + (profile_dir / "torch_tables").mkdir(parents=True, exist_ok=True) + (profile_dir / "step_timing_summary.json").write_text( + json.dumps({"total_update_s": {"count": 1, "mean": 0.3}, "peak_memory_allocated_bytes": 1024}) + ) + (profile_dir / "deterministic_forward.json").write_text( + json.dumps( + {"operator_fingerprint": "ops-fingerprint", "output_fingerprint": "output-fingerprint"} + ) + ) + (profile_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu time table") + return subprocess.CompletedProcess(cmd, returncode, "stdout ok", "") + + return _fake_run + + +def test_main_smoke_writes_row(monkeypatch, _fake_args): + monkeypatch.setattr(mp, "parse_args", lambda: _fake_args) + monkeypatch.setattr(mp.subprocess, "check_output", lambda *a, **k: "deadbeef\n") + monkeypatch.setattr(mp.subprocess, "run", _stub_train_subprocess(mp)) + + assert mp.main() == 0 + + row_paths = list(_fake_args.output_dir.rglob("profiling_row.json")) + assert len(row_paths) == 1 + row = json.loads(row_paths[0].read_text()) + assert row["policy"] == "act" + assert row["status"] == "success" + assert row["git_commit"] == "deadbeef" + assert row["git_ref"] == "codex/model-profiling" + assert row["pr_number"] == 3389 + assert row["step_timing_summary"]["total_update_s"]["mean"] == 0.3 + assert row["deterministic_forward"]["operator_fingerprint"] == "ops-fingerprint" + + +def test_main_records_publish_failure_without_failing(monkeypatch, _fake_args): + _fake_args.publish = True + _fake_args.git_commit = "deadbeef" + monkeypatch.setattr(mp, "parse_args", lambda: _fake_args) + monkeypatch.setattr(mp.subprocess, "run", _stub_train_subprocess(mp, write_artifacts=False)) + + def _fail_upload(**kwargs): + resp = type("Resp", (), {"status_code": 403, "headers": {}, "request": None})() + raise HfHubHTTPError("403 Forbidden: Authorization error.", response=resp) + + monkeypatch.setattr(mp, "upload_profile_run", _fail_upload) + + assert mp.main() == 0 + row = json.loads(next(_fake_args.output_dir.rglob("profiling_row.json")).read_text()) + assert row["status"] == "success" + assert row["publish_status"] == "failed" + assert "Authorization error" in row["publish_error"] + + +# --------------------------------------------------------------------------- +# TrainingProfiler behavior +# --------------------------------------------------------------------------- + + +def test_deterministic_forward_artifacts_preserve_policy_mode(tmp_path): + class _TrainingOnlyPolicy(torch.nn.Module): + def __init__(self): + super().__init__() + self.forward_calls = 0 + + def forward(self, batch): + self.forward_calls += 1 + assert self.training + return batch["value"].sum(), {"value": batch["value"]} + + dataset = [{"value": torch.tensor([1.0, 2.0])}] + policy = _TrainingOnlyPolicy() + policy.train() + + mp.write_deterministic_forward_artifacts( + policy=policy, + dataset=dataset, + batch_size=2, + preprocessor=lambda b: b, + output_dir=tmp_path, + device_type="cpu", + ) + + payload = json.loads((tmp_path / "deterministic_forward.json").read_text()) + assert policy.training is True + assert policy.forward_calls == 1 + assert payload["reference_batch_size"] == 2 + assert "operator_fingerprint" in payload + assert payload["outputs"]["loss"]["numel"] == 1 + + +def test_training_profiler_section_records_forward_backward_optimizer(tmp_path): + profiler = mp.TrainingProfiler(mode="summary", output_dir=tmp_path, device=torch.device("cpu")) + profiler.start() + for _ in range(3): + with profiler.section("forward"): + pass + with profiler.section("backward"): + pass + with profiler.section("optimizer"): + pass + profiler.step(1, argparse.Namespace(update_s=0.5, dataloading_s=0.01)) + profiler.finalize() + + payload = json.loads((tmp_path / "step_timing_summary.json").read_text()) + assert payload["forward_s"]["count"] == 3 + assert payload["backward_s"]["count"] == 3 + assert payload["optimizer_s"]["count"] == 3 + assert payload["total_update_s"]["mean"] == 0.5 + + +def test_training_profiler_accepts_metric_like_values(tmp_path): + class _MetricLike: + def __init__(self, v): + self.val = v + + profiler = mp.TrainingProfiler(mode="summary", output_dir=tmp_path, device=torch.device("cpu")) + profiler.start() + profiler.step(1, argparse.Namespace(update_s=_MetricLike(0.6), dataloading_s=_MetricLike(0.05))) + profiler.finalize() + + payload = json.loads((tmp_path / "step_timing_summary.json").read_text()) + assert payload["total_update_s"]["mean"] == 0.6 + assert payload["dataloading_s"]["mean"] == 0.05 + + +def test_profiler_device_time_uses_generic_attr_first(): + class _Event: + self_device_time_total = 12.3456 + + assert mp._get_profiler_device_time_us(_Event()) == 12.3456