refactor(profiling): consolidate into single module

Unify the profiling subsystem into one file per reviewer request.

Before (4 files):
  src/lerobot/utils/profiling_utils.py        399 LOC
  scripts/ci/run_model_profiling.py           337 LOC
  profiling/model_profiling_specs.json        181 LOC
  tests/scripts/test_model_profiling.py       423 LOC

After (2 files):
  src/lerobot/utils/model_profiling.py        758 LOC — TrainingProfiler +
                                                       CI orchestrator +
                                                       POLICY_SPECS (inline)
  tests/test_model_profiling.py               315 LOC

Net: -267 LOC and 4 files → 2. All functionality preserved: per-step
forward/backward/optimizer timings, torch profiler tables + chrome
traces, deterministic-forward fingerprint, HF Hub result upload, and
the same CLI surface.

Changes:
- Collapse `_StepTimingCollector` into inline attributes on
  `TrainingProfiler` (no separate class).
- Drop `ProfilingSpec` dataclass; specs are plain dicts.
- Inline the JSON matrix as a module-level `POLICY_SPECS` dict —
  one less file to keep in sync with the training args.
- CI workflow invokes `python -m lerobot.utils.model_profiling` in
  place of the standalone script.
- Tests import `lerobot.utils.model_profiling` directly instead of
  loading a script-by-path. Removed JSON schema tests that no
  longer apply.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pepijn
2026-04-20 21:31:17 +02:00
parent 8d982614a6
commit a515eadc96
8 changed files with 1079 additions and 1346 deletions

View File

@@ -23,12 +23,10 @@ on:
- feat/libero-benchmark
paths:
- .github/workflows/model_profiling.yml
- profiling/model_profiling_specs.json
- scripts/ci/run_model_profiling.py
- src/lerobot/configs/train.py
- src/lerobot/scripts/lerobot_train.py
- src/lerobot/utils/profiling_utils.py
- tests/scripts/test_model_profiling.py
- src/lerobot/utils/model_profiling.py
- tests/test_model_profiling.py
workflow_dispatch:
inputs:
git_ref:
@@ -206,7 +204,7 @@ jobs:
fi
cmd=(
uv run python scripts/ci/run_model_profiling.py
uv run python -m lerobot.utils.model_profiling
--output_dir=/workspace/profiling-results
--hub_org=lerobot
--results_repo="${RESULTS_REPO}"

View File

@@ -1,181 +0,0 @@
{
"act": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=act",
"--policy.device=cuda",
"--batch_size=4",
"--cudnn_deterministic=true"
]
},
"diffusion": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=diffusion",
"--policy.device=cuda",
"--batch_size=4",
"--cudnn_deterministic=true"
]
},
"groot": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.type=groot",
"--policy.base_model_path=nvidia/GR00T-N1.5-3B",
"--policy.tune_diffusion_model=true",
"--policy.tune_projector=true",
"--policy.tune_llm=false",
"--policy.tune_visual=false",
"--policy.use_bf16=true",
"--policy.device=cuda",
"--batch_size=1",
"--rename_map={\"observation.images.image\": \"observation.images.camera1\", \"observation.images.image2\": \"observation.images.camera2\"}"
]
},
"multi_task_dit": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=multi_task_dit",
"--policy.device=cuda",
"--policy.horizon=32",
"--policy.n_action_steps=30",
"--batch_size=4",
"--cudnn_deterministic=true"
]
},
"pi0": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi0_base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
"--use_policy_training_preset=false",
"--optimizer.type=sgd",
"--optimizer.lr=1e-5",
"--optimizer.weight_decay=0",
"--optimizer.grad_clip_norm=1.0",
"--scheduler.type=cosine_decay_with_warmup",
"--scheduler.peak_lr=1e-5",
"--scheduler.decay_lr=1e-6",
"--scheduler.num_warmup_steps=0",
"--scheduler.num_decay_steps=12",
"--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}"
]
},
"pi0_fast": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi0fast-base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
"--use_policy_training_preset=false",
"--optimizer.type=sgd",
"--optimizer.lr=1e-5",
"--optimizer.weight_decay=0",
"--optimizer.grad_clip_norm=1.0",
"--scheduler.type=cosine_decay_with_warmup",
"--scheduler.peak_lr=1e-5",
"--scheduler.decay_lr=1e-6",
"--scheduler.num_warmup_steps=0",
"--scheduler.num_decay_steps=12",
"--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}"
]
},
"pi05": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi05_base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
"--use_policy_training_preset=false",
"--optimizer.type=sgd",
"--optimizer.lr=1e-5",
"--optimizer.weight_decay=0",
"--optimizer.grad_clip_norm=1.0",
"--scheduler.type=cosine_decay_with_warmup",
"--scheduler.peak_lr=1e-5",
"--scheduler.decay_lr=1e-6",
"--scheduler.num_warmup_steps=0",
"--scheduler.num_decay_steps=12",
"--policy.normalization_mapping={\"ACTION\": \"MEAN_STD\", \"STATE\": \"MEAN_STD\", \"VISUAL\": \"IDENTITY\"}",
"--rename_map={\"observation.images.front\": \"observation.images.base_0_rgb\", \"observation.images.wrist\": \"observation.images.left_wrist_0_rgb\"}"
]
},
"smolvla": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/smolvla_base",
"--policy.load_vlm_weights=true",
"--policy.freeze_vision_encoder=false",
"--policy.train_expert_only=false",
"--policy.empty_cameras=1",
"--policy.device=cuda",
"--batch_size=1",
"--rename_map={\"observation.images.front\": \"observation.images.camera1\", \"observation.images.wrist\": \"observation.images.camera2\"}"
]
},
"wall_x": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/aloha_sim_insertion_human",
"--dataset.episodes=[0]",
"--policy.type=wall_x",
"--policy.pretrained_name_or_path=x-square-robot/wall-oss-flow",
"--policy.prediction_mode=diffusion",
"--policy.attn_implementation=eager",
"--policy.device=cuda",
"--batch_size=1",
"--use_policy_training_preset=false",
"--optimizer.type=sgd",
"--optimizer.lr=1e-5",
"--optimizer.weight_decay=0",
"--optimizer.grad_clip_norm=1.0",
"--scheduler.type=cosine_decay_with_warmup",
"--scheduler.peak_lr=1e-5",
"--scheduler.decay_lr=1e-6",
"--scheduler.num_warmup_steps=0",
"--scheduler.num_decay_steps=12"
]
},
"xvla": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/xvla-widowx",
"--policy.action_mode=auto",
"--policy.empty_cameras=1",
"--policy.device=cuda",
"--batch_size=1",
"--rename_map={\"observation.images.front\": \"observation.images.image\", \"observation.images.wrist\": \"observation.images.image2\"}"
]
}
}

View File

@@ -1,337 +0,0 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from huggingface_hub import CommitOperationAdd, HfApi
from huggingface_hub.errors import HfHubHTTPError
@dataclass(frozen=True)
class ProfilingSpec:
name: str
steps: int
train_args: list[str]
@dataclass(frozen=True)
class UploadTarget:
local_path: Path
path_in_repo: str
@dataclass(frozen=True)
class UploadResult:
uploaded_paths: dict[str, str]
pr_url: str | None = None
def utc_timestamp_slug(now: datetime | None = None) -> str:
current = now or datetime.now(UTC)
return current.strftime("%Y%m%dT%H%M%SZ")
def make_hub_file_url(
repo_id: str,
path_in_repo: str,
repo_type: str = "dataset",
revision: str = "main",
) -> str:
prefix = "datasets/" if repo_type == "dataset" else ""
return f"https://huggingface.co/{prefix}{repo_id}/resolve/{revision}/{path_in_repo}"
def parse_discussion_num(pr_url: str | None) -> int | None:
if not pr_url:
return None
match = re.search(r"/discussions/(\d+)$", pr_url)
return int(match.group(1)) if match else None
def upload_targets(
repo_id: str,
targets: list[UploadTarget],
*,
repo_type: str = "dataset",
token: str | None = None,
commit_message: str | None = None,
create_pr: bool = False,
) -> UploadResult:
api = HfApi(token=token)
operations = [
CommitOperationAdd(path_in_repo=target.path_in_repo, path_or_fileobj=str(target.local_path))
for target in targets
]
commit = api.create_commit(
repo_id=repo_id,
repo_type=repo_type,
operations=operations,
commit_message=commit_message or f"Upload {len(targets)} profiling artifacts",
revision="main",
create_pr=create_pr,
)
revision = "main"
pr_num = parse_discussion_num(commit.pr_url)
if create_pr and pr_num is not None:
revision = f"refs/pr/{pr_num}"
uploaded = {
target.path_in_repo: make_hub_file_url(
repo_id, target.path_in_repo, repo_type=repo_type, revision=revision
)
for target in targets
}
return UploadResult(uploaded_paths=uploaded, pr_url=commit.pr_url)
def normalize_repo_id(repo: str, hub_org: str) -> str:
return repo if "/" in repo else f"{hub_org}/{repo}"
def load_specs(path: Path) -> dict[str, ProfilingSpec]:
payload = json.loads(path.read_text())
return {
name: ProfilingSpec(name=name, steps=spec["steps"], train_args=spec["train_args"])
for name, spec in payload.items()
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--spec-file", type=Path, default=Path("profiling/model_profiling_specs.json"))
parser.add_argument("--policies", nargs="*", default=None)
parser.add_argument("--output_dir", type=Path, required=True)
parser.add_argument("--hub_org", default="lerobot")
parser.add_argument("--results_repo", default="model-profiling-history")
parser.add_argument("--publish", action="store_true")
parser.add_argument("--profile_mode", choices=["summary", "trace"], default="trace")
parser.add_argument("--git_commit", default="")
parser.add_argument("--git_ref", default="")
parser.add_argument("--pr_number", default="")
return parser.parse_args()
def get_selected_names(requested: list[str] | None, specs: dict[str, ProfilingSpec]) -> list[str]:
if not requested:
return list(specs)
unknown = sorted(set(requested) - set(specs))
if unknown:
raise ValueError(f"Unknown profiling policies: {', '.join(unknown)}")
return requested
def build_train_command(spec: ProfilingSpec, run_dir: Path, profile_mode: str) -> list[str]:
train_output_dir = run_dir / "train"
profile_output_dir = run_dir / "profiling"
return [
"uv",
"run",
"lerobot-train",
*spec.train_args,
f"--output_dir={train_output_dir}",
f"--steps={spec.steps}",
"--eval_freq=0",
"--save_checkpoint=false",
f"--save_freq={spec.steps}",
"--wandb.enable=false",
"--policy.push_to_hub=false",
"--num_workers=0",
"--log_freq=1",
f"--profile_mode={profile_mode}",
f"--profile_output_dir={profile_output_dir}",
]
def load_json_if_exists(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
return json.loads(path.read_text())
def build_artifact_index(
*,
repo_id: str,
run_dir: Path,
policy_name: str,
run_id: str,
) -> tuple[dict[str, Any], dict[str, Any], list[UploadTarget], str]:
row_path_in_repo = f"rows/{policy_name}/{run_id}.json"
artifact_root = f"artifacts/{policy_name}/{run_id}"
artifact_paths: dict[str, Any] = {
"row": row_path_in_repo,
"profiling_files": {},
"torch_tables": {},
"trace_files": {},
}
artifact_urls: dict[str, Any] = {
"row": make_hub_file_url(repo_id, row_path_in_repo),
"profiling_files": {},
"torch_tables": {},
"trace_files": {},
}
targets: list[UploadTarget] = []
for name in ("stdout.txt", "stderr.txt"):
path = run_dir / name
if not path.exists():
continue
repo_path = f"{artifact_root}/{name}"
artifact_paths[name.removesuffix(".txt")] = repo_path
artifact_urls[name.removesuffix(".txt")] = make_hub_file_url(repo_id, repo_path)
targets.append(UploadTarget(local_path=path, path_in_repo=repo_path))
profiling_dir = run_dir / "profiling"
for path in sorted(profiling_dir.rglob("*")) if profiling_dir.exists() else []:
if not path.is_file():
continue
relative_path = str(path.relative_to(run_dir))
repo_path = f"{artifact_root}/{relative_path}"
artifact_paths["profiling_files"][relative_path] = repo_path
artifact_urls["profiling_files"][relative_path] = make_hub_file_url(repo_id, repo_path)
targets.append(UploadTarget(local_path=path, path_in_repo=repo_path))
if path.name == "step_timing_summary.json":
artifact_paths["step_timing_summary"] = repo_path
artifact_urls["step_timing_summary"] = make_hub_file_url(repo_id, repo_path)
elif "torch_tables" in path.parts:
artifact_paths["torch_tables"][path.name] = repo_path
artifact_urls["torch_tables"][path.name] = make_hub_file_url(repo_id, repo_path)
elif "torch_traces" in path.parts:
artifact_paths["trace_files"][path.name] = repo_path
artifact_urls["trace_files"][path.name] = make_hub_file_url(repo_id, repo_path)
return artifact_paths, artifact_urls, targets, row_path_in_repo
def upload_profile_run(
*,
repo_id: str,
row_path: Path,
row_path_in_repo: str,
artifact_targets: list[UploadTarget],
create_pr: bool = False,
) -> UploadResult:
return upload_targets(
repo_id=repo_id,
targets=[*artifact_targets, UploadTarget(local_path=row_path, path_in_repo=row_path_in_repo)],
repo_type="dataset",
commit_message=f"Add model profiling row {row_path_in_repo}",
create_pr=create_pr,
)
def main() -> int:
args = parse_args()
specs = load_specs(args.spec_file)
selected = get_selected_names(args.policies, specs)
args.output_dir.mkdir(parents=True, exist_ok=True)
repo_id = normalize_repo_id(args.results_repo, args.hub_org)
git_executable = shutil.which("git")
if not git_executable:
raise RuntimeError("git executable not found in PATH")
git_commit = (
args.git_commit or subprocess.check_output([git_executable, "rev-parse", "HEAD"], text=True).strip()
)
pr_number = int(args.pr_number) if str(args.pr_number).strip() else None
for policy_name in selected:
spec = specs[policy_name]
run_id = f"{utc_timestamp_slug()}__{policy_name}"
run_dir = args.output_dir / policy_name / run_id
run_dir.mkdir(parents=True, exist_ok=True)
cmd = build_train_command(spec, run_dir, args.profile_mode)
start = time.perf_counter()
result = subprocess.run(cmd, capture_output=True, text=True)
duration_s = time.perf_counter() - start
stdout_path = run_dir / "stdout.txt"
stderr_path = run_dir / "stderr.txt"
stdout_path.write_text(result.stdout)
stderr_path.write_text(result.stderr)
profile_summary = load_json_if_exists(run_dir / "profiling" / "step_timing_summary.json") or {}
deterministic_forward = (
load_json_if_exists(run_dir / "profiling" / "deterministic_forward.json") or {}
)
artifact_paths, artifact_urls, artifact_targets, row_path_in_repo = build_artifact_index(
repo_id=repo_id,
run_dir=run_dir,
policy_name=policy_name,
run_id=run_id,
)
row = {
"schema_version": 1,
"created_at": datetime.now(UTC).isoformat(),
"run_id": run_id,
"policy": policy_name,
"git_commit": git_commit,
"git_ref": args.git_ref or None,
"pr_number": pr_number,
"status": "success" if result.returncode == 0 else "failed",
"return_code": result.returncode,
"profile_mode": args.profile_mode,
"wall_time_s": duration_s,
"spec": {
"steps": spec.steps,
"train_args": spec.train_args,
},
"step_timing_summary": profile_summary,
"deterministic_forward": deterministic_forward,
"artifact_paths": artifact_paths,
"artifact_urls": artifact_urls,
"stderr_tail": result.stderr.splitlines()[-20:],
}
row_path = run_dir / "profiling_row.json"
row_path.write_text(json.dumps(row, indent=2, sort_keys=True))
if args.publish:
try:
upload_result = upload_profile_run(
repo_id=repo_id,
row_path=row_path,
row_path_in_repo=row_path_in_repo,
artifact_targets=artifact_targets,
create_pr=pr_number is not None,
)
except HfHubHTTPError as exc:
row["publish_status"] = "failed"
row["publish_error"] = str(exc)
else:
row["publish_status"] = "success"
row["uploaded_paths"] = upload_result.uploaded_paths
row["publish_pr_url"] = upload_result.pr_url
row["publish_pr_number"] = parse_discussion_num(upload_result.pr_url)
row_path.write_text(json.dumps(row, indent=2, sort_keys=True))
print(json.dumps(row, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -49,7 +49,7 @@ from lerobot.optim.factory import make_optimizer_and_scheduler
from lerobot.policies import PreTrainedPolicy, make_policy, make_pre_post_processors
from lerobot.utils.import_utils import register_third_party_plugins
from lerobot.utils.logging_utils import AverageMeter, MetricsTracker
from lerobot.utils.profiling_utils import TrainingProfiler
from lerobot.utils.model_profiling import TrainingProfiler
from lerobot.utils.random_utils import set_seed
from lerobot.utils.utils import (
cycle,

View File

@@ -0,0 +1,763 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
"""Model profiling — single-file entry point.
Contains three things that used to live in three separate files:
* `TrainingProfiler` — hooks the training loop. Captures per-step
forward/backward/optimizer timings, the torch profiler output, and a
deterministic-forward fingerprint for regression detection.
* `POLICY_SPECS` — CI matrix of `policy_name → (steps, train_args)`.
Inline so there is no separate JSON to keep in sync.
* `main()` — CI orchestrator. For each selected policy, spawns a
`lerobot-train` subprocess with profiling enabled, collects the
artifacts, and (optionally) publishes a row to a HF Hub dataset.
Usage (CI):
python -m lerobot.utils.model_profiling \
--output_dir=./profiling-results \
--policies act diffusion \
--profile_mode=trace \
--publish
"""
from __future__ import annotations
import argparse
import contextlib
import hashlib
import json
import logging
import re
import shutil
import statistics
import subprocess
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime
from numbers import Real
from pathlib import Path
from typing import Any
import torch
from huggingface_hub import CommitOperationAdd, HfApi
from huggingface_hub.errors import HfHubHTTPError
from torch.utils.data._utils.collate import default_collate
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Policy matrix. Same shape as the former JSON file; inlined so the source
# tree has one less file to keep in sync with the training args.
# ---------------------------------------------------------------------------
_LIBERO_RENAME_BASE_RGB = (
'--rename_map={"observation.images.front": "observation.images.base_0_rgb", '
'"observation.images.wrist": "observation.images.left_wrist_0_rgb"}'
)
_LIBERO_RENAME_CAMERAS = (
'--rename_map={"observation.images.front": "observation.images.camera1", '
'"observation.images.wrist": "observation.images.camera2"}'
)
_PI_SGD = [
"--use_policy_training_preset=false",
"--optimizer.type=sgd",
"--optimizer.lr=1e-5",
"--optimizer.weight_decay=0",
"--optimizer.grad_clip_norm=1.0",
"--scheduler.type=cosine_decay_with_warmup",
"--scheduler.peak_lr=1e-5",
"--scheduler.decay_lr=1e-6",
"--scheduler.num_warmup_steps=0",
"--scheduler.num_decay_steps=12",
]
POLICY_SPECS: dict[str, dict[str, Any]] = {
"act": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=act",
"--policy.device=cuda",
"--batch_size=4",
"--cudnn_deterministic=true",
],
},
"diffusion": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=diffusion",
"--policy.device=cuda",
"--batch_size=4",
"--cudnn_deterministic=true",
],
},
"groot": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.type=groot",
"--policy.base_model_path=nvidia/GR00T-N1.5-3B",
"--policy.tune_diffusion_model=true",
"--policy.tune_projector=true",
"--policy.tune_llm=false",
"--policy.tune_visual=false",
"--policy.use_bf16=true",
"--policy.device=cuda",
"--batch_size=1",
'--rename_map={"observation.images.image": "observation.images.camera1", '
'"observation.images.image2": "observation.images.camera2"}',
],
},
"multi_task_dit": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=multi_task_dit",
"--policy.device=cuda",
"--policy.horizon=32",
"--policy.n_action_steps=30",
"--batch_size=4",
"--cudnn_deterministic=true",
],
},
"pi0": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi0_base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
*_PI_SGD,
_LIBERO_RENAME_BASE_RGB,
],
},
"pi0_fast": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi0fast-base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
*_PI_SGD,
_LIBERO_RENAME_BASE_RGB,
],
},
"pi05": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/pi05_base",
"--policy.device=cuda",
"--policy.dtype=bfloat16",
"--policy.n_action_steps=30",
"--policy.use_amp=true",
"--policy.gradient_checkpointing=true",
"--batch_size=1",
*_PI_SGD,
'--policy.normalization_mapping={"ACTION": "MEAN_STD", '
'"STATE": "MEAN_STD", "VISUAL": "IDENTITY"}',
_LIBERO_RENAME_BASE_RGB,
],
},
"smolvla": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/smolvla_base",
"--policy.load_vlm_weights=true",
"--policy.freeze_vision_encoder=false",
"--policy.train_expert_only=false",
"--policy.empty_cameras=1",
"--policy.device=cuda",
"--batch_size=1",
_LIBERO_RENAME_CAMERAS,
],
},
"wall_x": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/aloha_sim_insertion_human",
"--dataset.episodes=[0]",
"--policy.type=wall_x",
"--policy.pretrained_name_or_path=x-square-robot/wall-oss-flow",
"--policy.prediction_mode=diffusion",
"--policy.attn_implementation=eager",
"--policy.device=cuda",
"--batch_size=1",
*_PI_SGD,
],
},
"xvla": {
"steps": 12,
"train_args": [
"--dataset.repo_id=lerobot/libero_plus",
"--dataset.episodes=[0]",
"--policy.path=lerobot/xvla-widowx",
"--policy.action_mode=auto",
"--policy.empty_cameras=1",
"--policy.device=cuda",
"--batch_size=1",
'--rename_map={"observation.images.front": "observation.images.image", '
'"observation.images.wrist": "observation.images.image2"}',
],
},
}
# ---------------------------------------------------------------------------
# TrainingProfiler — hooks the training loop.
# ---------------------------------------------------------------------------
def _stable_float(value: float | int | None) -> float | None:
return None if value is None else round(float(value), 8)
def _as_float(value: Any) -> float:
if isinstance(value, Real):
return float(value)
if hasattr(value, "val"):
return float(value.val)
raise TypeError(f"Expected a real-valued metric, got {type(value).__name__}")
def _summary(values: list[float]) -> dict[str, float | int | None]:
if not values:
return {"count": 0, "mean": None, "median": None, "min": None, "max": None}
return {
"count": len(values),
"mean": statistics.fmean(values),
"median": statistics.median(values),
"min": min(values),
"max": max(values),
}
def _tensor_signature(tensor: torch.Tensor) -> dict[str, Any]:
"""Small, stable summary of a tensor so forward-pass outputs can be
compared across runs without bloating the regression JSON."""
cpu = tensor.detach().cpu()
hash_tensor = cpu.float() if cpu.dtype == torch.bfloat16 else cpu
sig: dict[str, Any] = {
"shape": list(cpu.shape),
"dtype": str(cpu.dtype),
"numel": cpu.numel(),
"sha256": hashlib.sha256(hash_tensor.contiguous().numpy().tobytes()).hexdigest(),
}
if cpu.numel():
promoted = cpu.to(torch.float64) if cpu.is_floating_point() else cpu.to(torch.int64)
sig["sum"] = _stable_float(promoted.sum().item())
sig["mean"] = _stable_float(promoted.float().mean().item())
return sig
def _summarize_value(value: Any) -> Any:
if isinstance(value, torch.Tensor):
return _tensor_signature(value)
if isinstance(value, dict):
return {k: _summarize_value(v) for k, v in value.items()}
if isinstance(value, (list, tuple)):
return [_summarize_value(v) for v in value]
if isinstance(value, (str, int, float, bool)) or value is None:
return value
return repr(value)
def _hash_payload(payload: Any) -> str:
return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
def _get_profiler_device_time_us(event: Any) -> float | None:
return _stable_float(
getattr(event, "self_device_time_total", getattr(event, "self_cuda_time_total", None))
)
def _write_profiler_table(profiler: Any, path: Path, *, sort_by: str, row_limit: int = 40) -> None:
# The profiler may not have recorded any events for this sort key when the
# schedule window lands outside the active steps — skip silently rather
# than crashing the whole artifact-writer pass.
with contextlib.suppress(Exception):
path.write_text(profiler.key_averages().table(sort_by=sort_by, row_limit=row_limit))
def write_deterministic_forward_artifacts(
*,
policy: Any,
dataset: Any,
batch_size: int,
preprocessor: Any,
output_dir: Path,
device_type: str,
) -> None:
"""Run a seed-controlled single forward pass and dump a stable fingerprint
(loss/output tensor hashes + op counts) for regression detection. Keeps
the caller-selected module mode so ACT-with-VAE-style policies that only
materialize their full forward outputs in `train()` still match."""
if len(dataset) == 0:
raise ValueError("Cannot build a reference batch from an empty dataset.")
indices = [i % len(dataset) for i in range(batch_size)]
reference_batch = preprocessor(default_collate([dataset[i] for i in indices]))
activities = [torch.profiler.ProfilerActivity.CPU]
if device_type == "cuda":
activities.append(torch.profiler.ProfilerActivity.CUDA)
with torch.random.fork_rng(devices=[] if device_type != "cuda" else None):
torch.manual_seed(0)
if device_type == "cuda":
torch.cuda.manual_seed_all(0)
with torch.no_grad(), torch.profiler.profile(activities=activities) as prof:
loss, output_dict = policy.forward(reference_batch)
operators = sorted(
(
{
"key": e.key,
"count": e.count,
"cpu_time_total_us": _stable_float(getattr(e, "cpu_time_total", None)),
**(
{"self_cuda_time_total_us": _get_profiler_device_time_us(e)}
if device_type == "cuda"
else {}
),
}
for e in prof.key_averages()
),
key=lambda e: e["key"],
)
outputs = {"loss": _summarize_value(loss), "output_dict": _summarize_value(output_dict)}
payload = {
"seed": 0,
"reference_batch_size": batch_size,
"operator_fingerprint": _hash_payload([(o["key"], o["count"]) for o in operators]),
"output_fingerprint": _hash_payload(outputs),
"operators": operators,
"outputs": outputs,
}
output_dir.mkdir(parents=True, exist_ok=True)
(output_dir / "deterministic_forward.json").write_text(json.dumps(payload, indent=2, sort_keys=True))
sort_by = "self_cuda_time_total" if device_type == "cuda" else "cpu_time_total"
_write_profiler_table(prof, output_dir / "deterministic_forward_ops.txt", sort_by=sort_by)
class TrainingProfiler:
"""Self-contained profiling hooks for the training loop.
The training script interacts via ``start()``, ``section()``, ``step()``,
``finalize()``, and (optionally) ``record_deterministic_forward()`` — a
~7-line surface.
"""
_SCHEDULE_WAIT = 1
_SCHEDULE_WARMUP = 2
_SCHEDULE_ACTIVE = 6
def __init__(self, mode: str, output_dir: Path, device: torch.device) -> None:
self._mode = mode
self._output_dir = output_dir
self._output_dir.mkdir(parents=True, exist_ok=True)
self._device = device
# Inline timing state — no separate collector class.
self._total_update_s: list[float] = []
self._dataloading_s: list[float] = []
self._section_s: dict[str, list[float]] = {}
self._memory: list[dict[str, int]] = []
self._torch = self._build_torch_profiler()
logger.info("Profiling enabled. Artifacts will be written to %s", output_dir)
def _build_torch_profiler(self) -> Any:
activities = [torch.profiler.ProfilerActivity.CPU]
if self._device.type == "cuda":
activities.append(torch.profiler.ProfilerActivity.CUDA)
trace_dir = self._output_dir / "torch_traces"
trace_dir.mkdir(parents=True, exist_ok=True)
def _on_trace_ready(p: Any) -> None:
if self._mode == "trace":
p.export_chrome_trace(str(trace_dir / f"trace_step_{p.step_num}.json"))
return torch.profiler.profile(
activities=activities,
schedule=torch.profiler.schedule(
wait=self._SCHEDULE_WAIT,
warmup=self._SCHEDULE_WARMUP,
active=self._SCHEDULE_ACTIVE,
repeat=1,
),
on_trace_ready=_on_trace_ready,
record_shapes=True,
profile_memory=True,
with_flops=True,
)
@classmethod
def from_cfg(cls, cfg: Any, device: torch.device) -> TrainingProfiler:
output = cfg.profile_output_dir or (Path(cfg.output_dir) / "profiling")
return cls(mode=cfg.profile_mode, output_dir=Path(output), device=device)
def record_deterministic_forward(
self, *, policy: Any, dataset: Any, batch_size: int, preprocessor: Any
) -> None:
logger.info("Recording deterministic forward-pass artifacts")
write_deterministic_forward_artifacts(
policy=policy,
dataset=dataset,
batch_size=batch_size,
preprocessor=preprocessor,
output_dir=self._output_dir,
device_type=self._device.type,
)
if self._device.type == "cuda":
torch.cuda.empty_cache()
def start(self) -> None:
if self._device.type == "cuda":
torch.cuda.reset_peak_memory_stats(self._device)
self._torch.__enter__()
@contextmanager
def section(self, name: str) -> Iterator[None]:
"""Time a region of the training step. Syncs on CUDA so the
duration reflects GPU work, not just kernel-launch latency."""
if self._device.type == "cuda":
torch.cuda.synchronize(self._device)
t0 = time.perf_counter()
try:
yield
finally:
if self._device.type == "cuda":
torch.cuda.synchronize(self._device)
self._section_s.setdefault(name, []).append(time.perf_counter() - t0)
def step(self, step_num: int, train_tracker: Any) -> None:
self._total_update_s.append(_as_float(train_tracker.update_s))
self._dataloading_s.append(_as_float(train_tracker.dataloading_s))
if self._device.type == "cuda":
self._memory.append(
{
"step": step_num,
"allocated_bytes": torch.cuda.memory_allocated(self._device),
"reserved_bytes": torch.cuda.memory_reserved(self._device),
}
)
self._torch.step()
def finalize(self) -> None:
self._torch.__exit__(None, None, None)
payload: dict[str, Any] = {
"profile_mode": self._mode,
"total_update_s": _summary(self._total_update_s),
"dataloading_s": _summary(self._dataloading_s),
"memory_timeline": self._memory,
}
for name, values in self._section_s.items():
payload[f"{name}_s"] = _summary(values)
if self._device.type == "cuda":
payload["peak_memory_allocated_bytes"] = torch.cuda.max_memory_allocated(self._device)
payload["peak_memory_reserved_bytes"] = torch.cuda.max_memory_reserved(self._device)
(self._output_dir / "step_timing_summary.json").write_text(
json.dumps(payload, indent=2, sort_keys=True)
)
tables_dir = self._output_dir / "torch_tables"
tables_dir.mkdir(parents=True, exist_ok=True)
_write_profiler_table(self._torch, tables_dir / "cpu_time_total.txt", sort_by="cpu_time_total")
_write_profiler_table(self._torch, tables_dir / "cpu_memory.txt", sort_by="self_cpu_memory_usage")
_write_profiler_table(self._torch, tables_dir / "flops.txt", sort_by="flops")
if self._device.type == "cuda":
_write_profiler_table(
self._torch, tables_dir / "cuda_time_total.txt", sort_by="self_cuda_time_total"
)
_write_profiler_table(
self._torch, tables_dir / "cuda_memory.txt", sort_by="self_cuda_memory_usage"
)
# ---------------------------------------------------------------------------
# CI orchestrator. Spawns `lerobot-train` per policy, collects the
# artifacts, (optionally) uploads to the HF Hub results dataset.
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class UploadTarget:
local_path: Path
path_in_repo: str
@dataclass(frozen=True)
class UploadResult:
uploaded_paths: dict[str, str]
pr_url: str | None = None
def _utc_timestamp_slug(now: datetime | None = None) -> str:
return (now or datetime.now(UTC)).strftime("%Y%m%dT%H%M%SZ")
def _hub_file_url(repo_id: str, path_in_repo: str, *, revision: str = "main") -> str:
return f"https://huggingface.co/datasets/{repo_id}/resolve/{revision}/{path_in_repo}"
def parse_discussion_num(pr_url: str | None) -> int | None:
if not pr_url:
return None
m = re.search(r"/discussions/(\d+)$", pr_url)
return int(m.group(1)) if m else None
def upload_targets(
repo_id: str,
targets: list[UploadTarget],
*,
token: str | None = None,
commit_message: str | None = None,
create_pr: bool = False,
) -> UploadResult:
api = HfApi(token=token)
commit = api.create_commit(
repo_id=repo_id,
repo_type="dataset",
operations=[
CommitOperationAdd(path_in_repo=t.path_in_repo, path_or_fileobj=str(t.local_path))
for t in targets
],
commit_message=commit_message or f"Upload {len(targets)} profiling artifacts",
revision="main",
create_pr=create_pr,
)
pr_num = parse_discussion_num(commit.pr_url)
revision = f"refs/pr/{pr_num}" if (create_pr and pr_num) else "main"
return UploadResult(
uploaded_paths={
t.path_in_repo: _hub_file_url(repo_id, t.path_in_repo, revision=revision) for t in targets
},
pr_url=commit.pr_url,
)
def build_train_command(policy: str, run_dir: Path, profile_mode: str) -> list[str]:
spec = POLICY_SPECS[policy]
return [
"uv",
"run",
"lerobot-train",
*spec["train_args"],
f"--output_dir={run_dir / 'train'}",
f"--steps={spec['steps']}",
"--eval_freq=0",
"--save_checkpoint=false",
f"--save_freq={spec['steps']}",
"--wandb.enable=false",
"--policy.push_to_hub=false",
"--num_workers=0",
"--log_freq=1",
f"--profile_mode={profile_mode}",
f"--profile_output_dir={run_dir / 'profiling'}",
]
def build_artifact_index(
*, repo_id: str, run_dir: Path, policy_name: str, run_id: str
) -> tuple[dict[str, Any], dict[str, Any], list[UploadTarget], str]:
"""Scan the run directory and categorize files into
(stdout/stderr, torch_tables/*, torch_traces/*, everything else under profiling/).
Returns (paths, urls, upload targets, row path in repo)."""
row_path_in_repo = f"rows/{policy_name}/{run_id}.json"
root = f"artifacts/{policy_name}/{run_id}"
paths: dict[str, Any] = {
"row": row_path_in_repo,
"profiling_files": {},
"torch_tables": {},
"trace_files": {},
}
urls: dict[str, Any] = {
"row": _hub_file_url(repo_id, row_path_in_repo),
"profiling_files": {},
"torch_tables": {},
"trace_files": {},
}
targets: list[UploadTarget] = []
for name in ("stdout.txt", "stderr.txt"):
p = run_dir / name
if p.exists():
key = name.removesuffix(".txt")
repo = f"{root}/{name}"
paths[key] = repo
urls[key] = _hub_file_url(repo_id, repo)
targets.append(UploadTarget(p, repo))
profiling_dir = run_dir / "profiling"
if profiling_dir.exists():
for p in sorted(profiling_dir.rglob("*")):
if not p.is_file():
continue
rel = str(p.relative_to(run_dir))
repo = f"{root}/{rel}"
paths["profiling_files"][rel] = repo
urls["profiling_files"][rel] = _hub_file_url(repo_id, repo)
targets.append(UploadTarget(p, repo))
if p.name == "step_timing_summary.json":
paths["step_timing_summary"] = repo
urls["step_timing_summary"] = _hub_file_url(repo_id, repo)
elif "torch_tables" in p.parts:
paths["torch_tables"][p.name] = repo
urls["torch_tables"][p.name] = _hub_file_url(repo_id, repo)
elif "torch_traces" in p.parts:
paths["trace_files"][p.name] = repo
urls["trace_files"][p.name] = _hub_file_url(repo_id, repo)
return paths, urls, targets, row_path_in_repo
def upload_profile_run(
*,
repo_id: str,
row_path: Path,
row_path_in_repo: str,
artifact_targets: list[UploadTarget],
create_pr: bool = False,
) -> UploadResult:
return upload_targets(
repo_id=repo_id,
targets=[*artifact_targets, UploadTarget(row_path, row_path_in_repo)],
commit_message=f"Add model profiling row {row_path_in_repo}",
create_pr=create_pr,
)
def _load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text()) if path.exists() else {}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--policies", nargs="*", default=None)
parser.add_argument("--output_dir", type=Path, required=True)
parser.add_argument("--hub_org", default="lerobot")
parser.add_argument("--results_repo", default="model-profiling-history")
parser.add_argument("--publish", action="store_true")
parser.add_argument("--profile_mode", choices=["summary", "trace"], default="trace")
parser.add_argument("--git_commit", default="")
parser.add_argument("--git_ref", default="")
parser.add_argument("--pr_number", default="")
return parser.parse_args()
def main() -> int:
args = parse_args()
selected = args.policies or list(POLICY_SPECS)
unknown = sorted(set(selected) - set(POLICY_SPECS))
if unknown:
raise ValueError(f"Unknown profiling policies: {', '.join(unknown)}")
args.output_dir.mkdir(parents=True, exist_ok=True)
repo_id = args.results_repo if "/" in args.results_repo else f"{args.hub_org}/{args.results_repo}"
git_exe = shutil.which("git") or (_ for _ in ()).throw(RuntimeError("git not found in PATH"))
git_commit = args.git_commit or subprocess.check_output([git_exe, "rev-parse", "HEAD"], text=True).strip()
pr_number = int(args.pr_number) if str(args.pr_number).strip() else None
for policy in selected:
run_id = f"{_utc_timestamp_slug()}__{policy}"
run_dir = args.output_dir / policy / run_id
run_dir.mkdir(parents=True, exist_ok=True)
cmd = build_train_command(policy, run_dir, args.profile_mode)
t0 = time.perf_counter()
result = subprocess.run(cmd, capture_output=True, text=True)
wall_s = time.perf_counter() - t0
(run_dir / "stdout.txt").write_text(result.stdout)
(run_dir / "stderr.txt").write_text(result.stderr)
paths, urls, upload_list, row_in_repo = build_artifact_index(
repo_id=repo_id, run_dir=run_dir, policy_name=policy, run_id=run_id
)
row: dict[str, Any] = {
"schema_version": 1,
"created_at": datetime.now(UTC).isoformat(),
"run_id": run_id,
"policy": policy,
"git_commit": git_commit,
"git_ref": args.git_ref or None,
"pr_number": pr_number,
"status": "success" if result.returncode == 0 else "failed",
"return_code": result.returncode,
"profile_mode": args.profile_mode,
"wall_time_s": wall_s,
"spec": {
"steps": POLICY_SPECS[policy]["steps"],
"train_args": POLICY_SPECS[policy]["train_args"],
},
"step_timing_summary": _load_json(run_dir / "profiling" / "step_timing_summary.json"),
"deterministic_forward": _load_json(run_dir / "profiling" / "deterministic_forward.json"),
"artifact_paths": paths,
"artifact_urls": urls,
"stderr_tail": result.stderr.splitlines()[-20:],
}
row_path = run_dir / "profiling_row.json"
row_path.write_text(json.dumps(row, indent=2, sort_keys=True))
if args.publish:
try:
uploaded = upload_profile_run(
repo_id=repo_id,
row_path=row_path,
row_path_in_repo=row_in_repo,
artifact_targets=upload_list,
create_pr=pr_number is not None,
)
except HfHubHTTPError as exc:
row.update({"publish_status": "failed", "publish_error": str(exc)})
else:
row.update(
{
"publish_status": "success",
"uploaded_paths": uploaded.uploaded_paths,
"publish_pr_url": uploaded.pr_url,
"publish_pr_number": parse_discussion_num(uploaded.pr_url),
}
)
row_path.write_text(json.dumps(row, indent=2, sort_keys=True))
print(json.dumps(row, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,399 +0,0 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import hashlib
import json
import logging
import statistics
import time
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import dataclass, field
from numbers import Real
from pathlib import Path
from typing import Any
import torch
from torch.utils.data._utils.collate import default_collate
def ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def write_profiler_table(
profiler: Any,
output_path: Path,
*,
sort_by: str,
row_limit: int = 40,
) -> None:
try:
table = profiler.key_averages().table(sort_by=sort_by, row_limit=row_limit)
except Exception:
return
output_path.write_text(table)
def _make_torch_profiler(
*,
mode: str,
output_dir: Path,
device_type: str,
wait_steps: int = 1,
warmup_steps: int = 2,
active_steps: int = 6,
repeat: int = 1,
record_shapes: bool = True,
with_memory: bool = True,
with_flops: bool = True,
with_stack: bool = False,
) -> Any:
activities = [torch.profiler.ProfilerActivity.CPU]
if device_type == "cuda":
activities.append(torch.profiler.ProfilerActivity.CUDA)
trace_dir = ensure_dir(output_dir / "torch_traces")
def _trace_ready(profiler: Any) -> None:
if mode != "trace":
return
profiler.export_chrome_trace(str(trace_dir / f"trace_step_{profiler.step_num}.json"))
return torch.profiler.profile(
activities=activities,
schedule=torch.profiler.schedule(
wait=wait_steps,
warmup=warmup_steps,
active=active_steps,
repeat=repeat,
),
on_trace_ready=_trace_ready,
record_shapes=record_shapes,
profile_memory=with_memory,
with_flops=with_flops,
with_stack=with_stack,
)
def write_torch_profiler_outputs(
profiler: Any,
output_dir: Path,
*,
device_type: str,
) -> None:
tables_dir = ensure_dir(output_dir / "torch_tables")
write_profiler_table(profiler, tables_dir / "cpu_time_total.txt", sort_by="cpu_time_total")
if device_type == "cuda":
write_profiler_table(profiler, tables_dir / "cuda_time_total.txt", sort_by="self_cuda_time_total")
write_profiler_table(profiler, tables_dir / "cuda_memory.txt", sort_by="self_cuda_memory_usage")
write_profiler_table(profiler, tables_dir / "cpu_memory.txt", sort_by="self_cpu_memory_usage")
write_profiler_table(profiler, tables_dir / "flops.txt", sort_by="flops")
def _stable_float(value: float | int | None) -> float | None:
if value is None:
return None
return round(float(value), 8)
def _tensor_signature(tensor: torch.Tensor) -> dict[str, Any]:
cpu_tensor = tensor.detach().cpu()
if cpu_tensor.numel() == 0:
stats = {"sum": None, "mean": None, "std": None, "min": None, "max": None}
else:
stats_tensor = (
cpu_tensor.to(torch.float64) if cpu_tensor.is_floating_point() else cpu_tensor.to(torch.int64)
)
stats = {
"sum": _stable_float(stats_tensor.sum().item()),
"mean": _stable_float(stats_tensor.float().mean().item()),
"std": _stable_float(stats_tensor.float().std(unbiased=False).item())
if cpu_tensor.numel() > 1
else 0.0,
"min": _stable_float(stats_tensor.min().item()),
"max": _stable_float(stats_tensor.max().item()),
}
hash_tensor = cpu_tensor.float() if cpu_tensor.dtype == torch.bfloat16 else cpu_tensor
digest = hashlib.sha256(hash_tensor.contiguous().numpy().tobytes()).hexdigest()
return {
"shape": list(cpu_tensor.shape),
"dtype": str(cpu_tensor.dtype),
"numel": cpu_tensor.numel(),
"sha256": digest,
**stats,
}
def _summarize_forward_value(value: Any) -> Any:
if isinstance(value, torch.Tensor):
return _tensor_signature(value)
if isinstance(value, dict):
return {key: _summarize_forward_value(val) for key, val in value.items()}
if isinstance(value, (list, tuple)):
return [_summarize_forward_value(item) for item in value]
if isinstance(value, (str, int, float, bool)) or value is None:
return value
return repr(value)
def _hash_payload(payload: Any) -> str:
return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
def _get_profiler_device_time_us(event: Any) -> float | None:
return _stable_float(
getattr(event, "self_device_time_total", getattr(event, "self_cuda_time_total", None))
)
def _build_reference_batch(dataset: Any, batch_size: int) -> Any:
if len(dataset) == 0:
raise ValueError("Cannot build a reference batch from an empty dataset.")
indices = [idx % len(dataset) for idx in range(batch_size)]
samples = [dataset[idx] for idx in indices]
return default_collate(samples)
def write_deterministic_forward_artifacts(
*,
policy: Any,
dataset: Any,
batch_size: int,
preprocessor: Any,
output_dir: Path,
device_type: str,
) -> None:
reference_batch = preprocessor(_build_reference_batch(dataset, batch_size))
activities = [torch.profiler.ProfilerActivity.CPU]
if device_type == "cuda":
activities.append(torch.profiler.ProfilerActivity.CUDA)
# Keep the caller-selected module mode so the fingerprint matches the actual
# train-path forward used by the policy. Some policies, such as ACT with VAE,
# only materialize their full forward outputs while in training mode.
with torch.random.fork_rng(devices=[] if device_type != "cuda" else None):
torch.manual_seed(0)
if device_type == "cuda":
torch.cuda.manual_seed_all(0)
with torch.no_grad(), torch.profiler.profile(activities=activities) as profiler:
loss, output_dict = policy.forward(reference_batch)
operator_entries = []
for event in profiler.key_averages():
entry = {
"key": event.key,
"count": event.count,
"cpu_time_total_us": _stable_float(getattr(event, "cpu_time_total", None)),
}
if device_type == "cuda":
entry["self_cuda_time_total_us"] = _get_profiler_device_time_us(event)
operator_entries.append(entry)
operator_entries = sorted(operator_entries, key=lambda item: item["key"])
output_summary = {
"loss": _summarize_forward_value(loss),
"output_dict": _summarize_forward_value(output_dict),
}
payload = {
"seed": 0,
"reference_batch_size": batch_size,
"operator_fingerprint": _hash_payload([(entry["key"], entry["count"]) for entry in operator_entries]),
"output_fingerprint": _hash_payload(output_summary),
"operators": operator_entries,
"outputs": output_summary,
}
(output_dir / "deterministic_forward.json").write_text(json.dumps(payload, indent=2, sort_keys=True))
table_sort = "self_cuda_time_total" if device_type == "cuda" else "cpu_time_total"
write_profiler_table(profiler, output_dir / "deterministic_forward_ops.txt", sort_by=table_sort)
def _summary(values: list[float]) -> dict[str, float] | dict[str, None]:
if not values:
return {"count": 0, "mean": None, "median": None, "min": None, "max": None}
return {
"count": len(values),
"mean": statistics.fmean(values),
"median": statistics.median(values),
"min": min(values),
"max": max(values),
}
def _as_float(value: Any) -> float:
if isinstance(value, Real):
return float(value)
if hasattr(value, "val"):
return float(value.val)
raise TypeError(f"Expected a real-valued metric, got {type(value).__name__}")
@dataclass
class _StepTimingCollector:
total_update_s: list[float] = field(default_factory=list)
dataloading_s: list[float] = field(default_factory=list)
section_s: dict[str, list[float]] = field(default_factory=dict)
memory_timeline: list[dict[str, float | int]] = field(default_factory=list)
def record_step(self, total_update_s: float) -> None:
self.total_update_s.append(_as_float(total_update_s))
def record_dataloading(self, dataloading_s: float) -> None:
self.dataloading_s.append(_as_float(dataloading_s))
def record_section(self, name: str, duration_s: float) -> None:
self.section_s.setdefault(name, []).append(_as_float(duration_s))
def record_memory(self, *, step: int, allocated_bytes: int, reserved_bytes: int) -> None:
self.memory_timeline.append(
{
"step": step,
"allocated_bytes": allocated_bytes,
"reserved_bytes": reserved_bytes,
}
)
def to_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"total_update_s": _summary(self.total_update_s),
"dataloading_s": _summary(self.dataloading_s),
"memory_timeline": self.memory_timeline,
}
for name, values in self.section_s.items():
payload[f"{name}_s"] = _summary(values)
return payload
def write_json(self, output_path: Path, extra: dict[str, Any] | None = None) -> None:
payload = self.to_dict()
if extra:
payload.update(extra)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2, sort_keys=True))
class TrainingProfiler:
"""Self-contained profiling orchestrator for the training loop.
Encapsulates torch profiler setup, step-level timing collection, deterministic
forward-pass artifact recording, and all output writing. The training script
interacts with it through a thin interface (~7 lines).
"""
def __init__(
self,
mode: str,
output_dir: Path,
device: torch.device,
*,
wait_steps: int = 1,
warmup_steps: int = 2,
active_steps: int = 6,
repeat: int = 1,
record_shapes: bool = True,
with_memory: bool = True,
with_flops: bool = True,
with_stack: bool = False,
) -> None:
self._mode = mode
self._output_dir = ensure_dir(output_dir)
self._device = device
self._timing = _StepTimingCollector()
self._torch_profiler = _make_torch_profiler(
mode=mode,
output_dir=output_dir,
device_type=device.type,
wait_steps=wait_steps,
warmup_steps=warmup_steps,
active_steps=active_steps,
repeat=repeat,
record_shapes=record_shapes,
with_memory=with_memory,
with_flops=with_flops,
with_stack=with_stack,
)
logging.info("Profiling enabled. Artifacts will be written to %s", output_dir)
@classmethod
def from_cfg(cls, cfg: Any, device: torch.device) -> TrainingProfiler:
output_dir = cfg.profile_output_dir
if output_dir is None:
output_dir = Path(cfg.output_dir) / "profiling"
return cls(mode=cfg.profile_mode, output_dir=Path(output_dir), device=device)
def record_deterministic_forward(
self,
*,
policy: Any,
dataset: Any,
batch_size: int,
preprocessor: Any,
) -> None:
logging.info("Recording deterministic forward-pass artifacts")
write_deterministic_forward_artifacts(
policy=policy,
dataset=dataset,
batch_size=batch_size,
preprocessor=preprocessor,
output_dir=self._output_dir,
device_type=self._device.type,
)
if self._device.type == "cuda":
torch.cuda.empty_cache()
def start(self) -> None:
if self._device.type == "cuda":
torch.cuda.reset_peak_memory_stats(self._device)
self._torch_profiler.__enter__()
@contextmanager
def section(self, name: str) -> Iterator[None]:
"""Time a region of the training step (e.g. forward/backward/optimizer).
On CUDA we synchronize before and after so the reported duration
reflects GPU work, not just the CPU-side kernel-launch latency.
"""
if self._device.type == "cuda":
torch.cuda.synchronize(self._device)
start = time.perf_counter()
try:
yield
finally:
if self._device.type == "cuda":
torch.cuda.synchronize(self._device)
self._timing.record_section(name, time.perf_counter() - start)
def step(self, step_num: int, train_tracker: Any) -> None:
self._timing.record_step(_as_float(train_tracker.update_s))
self._timing.record_dataloading(_as_float(train_tracker.dataloading_s))
if self._device.type == "cuda":
self._timing.record_memory(
step=step_num,
allocated_bytes=torch.cuda.memory_allocated(self._device),
reserved_bytes=torch.cuda.memory_reserved(self._device),
)
self._torch_profiler.step()
def finalize(self) -> None:
self._torch_profiler.__exit__(None, None, None)
extra: dict[str, Any] = {"profile_mode": self._mode}
if self._device.type == "cuda":
extra["peak_memory_allocated_bytes"] = torch.cuda.max_memory_allocated(self._device)
extra["peak_memory_reserved_bytes"] = torch.cuda.max_memory_reserved(self._device)
self._timing.write_json(self._output_dir / "step_timing_summary.json", extra=extra)
write_torch_profiler_outputs(self._torch_profiler, self._output_dir, device_type=self._device.type)

View File

@@ -1,423 +0,0 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import argparse
import importlib.util
import json
import subprocess
import sys
from pathlib import Path
import pytest
import torch
from huggingface_hub.errors import HfHubHTTPError
def _import_model_profiling_script():
script_path = Path(__file__).resolve().parents[2] / "scripts" / "ci" / "run_model_profiling.py"
module_name = "tests.scripts.run_model_profiling"
spec = importlib.util.spec_from_file_location(module_name, script_path)
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
assert spec.loader is not None
spec.loader.exec_module(module)
return module
def test_profiling_specs_cover_expected_policies():
module = _import_model_profiling_script()
spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json"
specs = module.load_specs(spec_path)
assert set(specs) == {
"act",
"diffusion",
"groot",
"multi_task_dit",
"pi0",
"pi0_fast",
"pi05",
"smolvla",
"wall_x",
"xvla",
}
for excluded in ("sac", "sarm", "tdmpc", "vqbet", "reward_classifier"):
assert excluded not in specs
def test_pretrained_libero_specs_match_expected_camera_keys_and_normalization():
module = _import_model_profiling_script()
spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json"
specs = module.load_specs(spec_path)
assert (
'--rename_map={"observation.images.front": "observation.images.base_0_rgb", '
'"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi0"].train_args
)
assert (
'--rename_map={"observation.images.front": "observation.images.base_0_rgb", '
'"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi0_fast"].train_args
)
assert (
'--rename_map={"observation.images.front": "observation.images.base_0_rgb", '
'"observation.images.wrist": "observation.images.left_wrist_0_rgb"}' in specs["pi05"].train_args
)
assert (
'--policy.normalization_mapping={"ACTION": "MEAN_STD", '
'"STATE": "MEAN_STD", "VISUAL": "IDENTITY"}' in specs["pi05"].train_args
)
assert (
'--rename_map={"observation.images.front": "observation.images.camera1", '
'"observation.images.wrist": "observation.images.camera2"}' in specs["smolvla"].train_args
)
def test_build_train_command_includes_profiling_outputs(tmp_path):
module = _import_model_profiling_script()
spec_path = Path(__file__).resolve().parents[2] / "profiling" / "model_profiling_specs.json"
spec = module.load_specs(spec_path)["act"]
cmd = module.build_train_command(spec, tmp_path / "run", "trace")
assert cmd[:3] == ["uv", "run", "lerobot-train"]
assert any(arg.startswith("--output_dir=") for arg in cmd)
assert any(arg.startswith("--profile_output_dir=") for arg in cmd)
assert "--profile_mode=trace" in cmd
assert "--eval_freq=0" in cmd
def test_build_artifact_index_collects_tables_and_traces(tmp_path):
module = _import_model_profiling_script()
run_dir = tmp_path / "act" / "20260415T000000Z__act"
profiling_dir = run_dir / "profiling"
(profiling_dir / "torch_tables").mkdir(parents=True, exist_ok=True)
(profiling_dir / "torch_traces").mkdir(parents=True, exist_ok=True)
(profiling_dir / "step_timing_summary.json").write_text("{}")
(profiling_dir / "deterministic_forward.json").write_text(
json.dumps({"operator_fingerprint": "ops123", "output_fingerprint": "out123"})
)
(profiling_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu table")
(profiling_dir / "torch_traces" / "trace_step_9.json").write_text("{}")
(run_dir / "stdout.txt").write_text("stdout")
(run_dir / "stderr.txt").write_text("stderr")
artifact_paths, artifact_urls, targets, row_path_in_repo = module.build_artifact_index(
repo_id="lerobot/model-profiling-history",
run_dir=run_dir,
policy_name="act",
run_id="20260415T000000Z__act",
)
assert row_path_in_repo == "rows/act/20260415T000000Z__act.json"
assert artifact_paths["stdout"].endswith("/stdout.txt")
assert artifact_paths["step_timing_summary"].endswith("/profiling/step_timing_summary.json")
assert "cpu_time_total.txt" in artifact_paths["torch_tables"]
assert "trace_step_9.json" in artifact_paths["trace_files"]
assert artifact_paths["profiling_files"]["profiling/deterministic_forward.json"].endswith(
"/profiling/deterministic_forward.json"
)
assert artifact_urls["row"].startswith("https://huggingface.co/datasets/lerobot/model-profiling-history/")
assert len(targets) == 6
def test_upload_targets_batches_preview_publish_into_single_hf_pr(monkeypatch, tmp_path):
module = _import_model_profiling_script()
local_path = tmp_path / "profiling_row.json"
local_path.write_text("{}")
captured: dict[str, object] = {}
class _FakeCommit:
pr_url = "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42"
class _FakeApi:
def __init__(self, token=None):
captured["token"] = token
def create_commit(self, **kwargs):
captured.update(kwargs)
return _FakeCommit()
monkeypatch.setattr(module, "HfApi", _FakeApi)
result = module.upload_targets(
repo_id="lerobot/model-profiling-history",
targets=[module.UploadTarget(local_path=local_path, path_in_repo="rows/act/run.json")],
create_pr=True,
token="hf_test_token",
)
assert captured["repo_id"] == "lerobot/model-profiling-history"
assert captured["repo_type"] == "dataset"
assert captured["revision"] == "main"
assert captured["create_pr"] is True
operations = captured["operations"]
assert len(operations) == 1
assert operations[0].path_in_repo == "rows/act/run.json"
assert result.pr_url == _FakeCommit.pr_url
assert result.uploaded_paths["rows/act/run.json"].endswith("/resolve/refs/pr/42/rows/act/run.json")
def test_model_profiling_main_smoke_writes_row(monkeypatch, tmp_path):
module = _import_model_profiling_script()
spec_file = tmp_path / "specs.json"
spec_file.write_text(
json.dumps(
{
"act": {
"steps": 4,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=act",
"--policy.device=cuda",
"--batch_size=4",
],
}
}
)
)
args = argparse.Namespace(
spec_file=spec_file,
policies=["act"],
output_dir=tmp_path / "results",
hub_org="lerobot",
results_repo="model-profiling-history",
publish=False,
profile_mode="summary",
git_commit="",
git_ref="codex/model-profiling",
pr_number="3389",
)
monkeypatch.setattr(module, "parse_args", lambda: args)
monkeypatch.setattr(module.subprocess, "check_output", lambda *a, **k: "deadbeef\n")
def _fake_run(cmd, capture_output, text):
assert capture_output is True
assert text is True
profile_dir = Path(
next(arg.split("=", 1)[1] for arg in cmd if arg.startswith("--profile_output_dir="))
)
(profile_dir / "torch_tables").mkdir(parents=True, exist_ok=True)
(profile_dir / "step_timing_summary.json").write_text(
json.dumps(
{
"total_update_s": {"count": 1, "mean": 0.3, "median": 0.3, "min": 0.3, "max": 0.3},
"peak_memory_allocated_bytes": 1024,
}
)
)
(profile_dir / "deterministic_forward.json").write_text(
json.dumps(
{
"operator_fingerprint": "ops-fingerprint",
"output_fingerprint": "output-fingerprint",
}
)
)
(profile_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu time table")
return subprocess.CompletedProcess(cmd, 0, "stdout ok", "")
monkeypatch.setattr(module.subprocess, "run", _fake_run)
assert module.main() == 0
row_paths = list((tmp_path / "results").rglob("profiling_row.json"))
assert len(row_paths) == 1
row = json.loads(row_paths[0].read_text())
assert row["policy"] == "act"
assert row["status"] == "success"
assert row["git_commit"] == "deadbeef"
assert row["git_ref"] == "codex/model-profiling"
assert row["pr_number"] == 3389
assert row["step_timing_summary"]["total_update_s"]["mean"] == 0.3
assert row["deterministic_forward"]["operator_fingerprint"] == "ops-fingerprint"
def test_model_profiling_publish_failure_is_recorded_without_failing(monkeypatch, tmp_path):
module = _import_model_profiling_script()
spec_file = tmp_path / "specs.json"
spec_file.write_text(
json.dumps(
{
"act": {
"steps": 1,
"train_args": [
"--dataset.repo_id=lerobot/pusht",
"--dataset.episodes=[0]",
"--policy.type=act",
"--policy.device=cuda",
"--batch_size=4",
],
}
}
)
)
args = argparse.Namespace(
spec_file=spec_file,
policies=["act"],
output_dir=tmp_path / "results",
hub_org="lerobot",
results_repo="model-profiling-history",
publish=True,
profile_mode="summary",
git_commit="deadbeef",
git_ref="codex/model-profiling",
pr_number="3389",
)
monkeypatch.setattr(module, "parse_args", lambda: args)
def _fake_run(cmd, capture_output, text):
profile_dir = Path(
next(arg.split("=", 1)[1] for arg in cmd if arg.startswith("--profile_output_dir="))
)
profile_dir.mkdir(parents=True, exist_ok=True)
return subprocess.CompletedProcess(cmd, 0, "stdout ok", "")
monkeypatch.setattr(module.subprocess, "run", _fake_run)
def _fake_upload_profile_run(**kwargs):
response = type("Response", (), {"status_code": 403, "headers": {}, "request": None})()
raise HfHubHTTPError("403 Forbidden: Authorization error.", response=response)
monkeypatch.setattr(module, "upload_profile_run", _fake_upload_profile_run)
assert module.main() == 0
row_paths = list((tmp_path / "results").rglob("profiling_row.json"))
assert len(row_paths) == 1
row = json.loads(row_paths[0].read_text())
assert row["status"] == "success"
assert row["publish_status"] == "failed"
assert "Authorization error" in row["publish_error"]
def test_parse_discussion_num_handles_hf_discussion_urls():
module = _import_model_profiling_script()
assert (
module.parse_discussion_num(
"https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42"
)
== 42
)
assert (
module.parse_discussion_num("https://huggingface.co/datasets/lerobot/model-profiling-history") is None
)
def test_deterministic_forward_artifacts_preserve_policy_mode(tmp_path):
from lerobot.utils.profiling_utils import write_deterministic_forward_artifacts
class _TrainingOnlyPolicy(torch.nn.Module):
def __init__(self):
super().__init__()
self.forward_calls = 0
def forward(self, batch):
self.forward_calls += 1
assert self.training
return batch["value"].sum(), {"value": batch["value"]}
dataset = [{"value": torch.tensor([1.0, 2.0])}]
policy = _TrainingOnlyPolicy()
policy.train()
write_deterministic_forward_artifacts(
policy=policy,
dataset=dataset,
batch_size=2,
preprocessor=lambda batch: batch,
output_dir=tmp_path,
device_type="cpu",
)
payload = json.loads((tmp_path / "deterministic_forward.json").read_text())
assert policy.training is True
assert policy.forward_calls == 1
assert payload["reference_batch_size"] == 2
assert "operator_fingerprint" in payload
assert payload["outputs"]["loss"]["numel"] == 1
def test_step_timing_collector_accepts_metric_like_values(tmp_path):
from lerobot.utils.profiling_utils import _StepTimingCollector
class _MetricLike:
def __init__(self, val):
self.val = val
collector = _StepTimingCollector()
collector.record_step(_MetricLike(0.6))
collector.record_dataloading(_MetricLike(0.05))
collector.write_json(tmp_path / "step_timing_summary.json")
payload = json.loads((tmp_path / "step_timing_summary.json").read_text())
assert payload["total_update_s"]["mean"] == 0.6
assert payload["dataloading_s"]["mean"] == 0.05
def test_step_timing_collector_records_forward_backward_optimizer(tmp_path):
from lerobot.utils.profiling_utils import _StepTimingCollector
collector = _StepTimingCollector()
for _ in range(3):
collector.record_section("forward", 0.10)
collector.record_section("backward", 0.20)
collector.record_section("optimizer", 0.05)
collector.write_json(tmp_path / "step_timing_summary.json")
payload = json.loads((tmp_path / "step_timing_summary.json").read_text())
assert payload["forward_s"]["mean"] == pytest.approx(0.10)
assert payload["backward_s"]["mean"] == pytest.approx(0.20)
assert payload["optimizer_s"]["mean"] == pytest.approx(0.05)
assert payload["forward_s"]["count"] == 3
def test_training_profiler_section_records_duration(tmp_path):
from lerobot.utils.profiling_utils import TrainingProfiler
profiler = TrainingProfiler(
mode="summary",
output_dir=tmp_path,
device=torch.device("cpu"),
)
profiler.start()
with profiler.section("forward"):
pass
with profiler.section("backward"):
pass
profiler.step(1, argparse.Namespace(update_s=0.5, dataloading_s=0.01))
profiler.finalize()
payload = json.loads((tmp_path / "step_timing_summary.json").read_text())
assert payload["forward_s"]["count"] == 1
assert payload["backward_s"]["count"] == 1
assert payload["forward_s"]["mean"] >= 0.0
def test_profiler_device_time_uses_generic_attr_first():
from lerobot.utils.profiling_utils import _get_profiler_device_time_us
class _Event:
self_device_time_total = 12.3456
assert _get_profiler_device_time_us(_Event()) == 12.3456

View File

@@ -0,0 +1,312 @@
#!/usr/bin/env python
# Copyright 2026 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
import pytest
import torch
from huggingface_hub.errors import HfHubHTTPError
from lerobot.utils import model_profiling as mp
# ---------------------------------------------------------------------------
# Policy spec matrix
# ---------------------------------------------------------------------------
def test_policy_specs_cover_expected_policies():
assert set(mp.POLICY_SPECS) == {
"act",
"diffusion",
"groot",
"multi_task_dit",
"pi0",
"pi0_fast",
"pi05",
"smolvla",
"wall_x",
"xvla",
}
# Sanity: excluded policies should stay out of the matrix.
for excluded in ("sac", "sarm", "tdmpc", "vqbet", "reward_classifier"):
assert excluded not in mp.POLICY_SPECS
def test_pretrained_libero_specs_match_expected_camera_keys_and_normalization():
base_rgb_rename = (
'--rename_map={"observation.images.front": "observation.images.base_0_rgb", '
'"observation.images.wrist": "observation.images.left_wrist_0_rgb"}'
)
for name in ("pi0", "pi0_fast", "pi05"):
assert base_rgb_rename in mp.POLICY_SPECS[name]["train_args"]
assert any(
arg.startswith('--policy.normalization_mapping={"ACTION": "MEAN_STD"')
for arg in mp.POLICY_SPECS["pi05"]["train_args"]
)
assert (
'--rename_map={"observation.images.front": "observation.images.camera1", '
'"observation.images.wrist": "observation.images.camera2"}'
in mp.POLICY_SPECS["smolvla"]["train_args"]
)
# ---------------------------------------------------------------------------
# CI orchestrator helpers
# ---------------------------------------------------------------------------
def test_build_train_command_includes_profiling_outputs(tmp_path):
cmd = mp.build_train_command("act", tmp_path / "run", "trace")
assert cmd[:3] == ["uv", "run", "lerobot-train"]
assert any(a.startswith("--output_dir=") for a in cmd)
assert any(a.startswith("--profile_output_dir=") for a in cmd)
assert "--profile_mode=trace" in cmd
assert "--eval_freq=0" in cmd
def test_build_artifact_index_collects_tables_and_traces(tmp_path):
run_dir = tmp_path / "act" / "20260415T000000Z__act"
profiling = run_dir / "profiling"
(profiling / "torch_tables").mkdir(parents=True)
(profiling / "torch_traces").mkdir(parents=True)
(profiling / "step_timing_summary.json").write_text("{}")
(profiling / "deterministic_forward.json").write_text(
json.dumps({"operator_fingerprint": "ops", "output_fingerprint": "out"})
)
(profiling / "torch_tables" / "cpu_time_total.txt").write_text("cpu table")
(profiling / "torch_traces" / "trace_step_9.json").write_text("{}")
(run_dir / "stdout.txt").write_text("stdout")
(run_dir / "stderr.txt").write_text("stderr")
paths, urls, targets, row_in_repo = mp.build_artifact_index(
repo_id="lerobot/model-profiling-history",
run_dir=run_dir,
policy_name="act",
run_id="20260415T000000Z__act",
)
assert row_in_repo == "rows/act/20260415T000000Z__act.json"
assert paths["stdout"].endswith("/stdout.txt")
assert paths["step_timing_summary"].endswith("/profiling/step_timing_summary.json")
assert "cpu_time_total.txt" in paths["torch_tables"]
assert "trace_step_9.json" in paths["trace_files"]
assert urls["row"].startswith("https://huggingface.co/datasets/lerobot/model-profiling-history/")
# stdout + stderr + 4 profiling files
assert len(targets) == 6
def test_upload_targets_batches_preview_publish_into_single_hf_pr(monkeypatch, tmp_path):
local_path = tmp_path / "profiling_row.json"
local_path.write_text("{}")
captured: dict[str, object] = {}
class _FakeCommit:
pr_url = "https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42"
class _FakeApi:
def __init__(self, token=None):
captured["token"] = token
def create_commit(self, **kwargs):
captured.update(kwargs)
return _FakeCommit()
monkeypatch.setattr(mp, "HfApi", _FakeApi)
result = mp.upload_targets(
repo_id="lerobot/model-profiling-history",
targets=[mp.UploadTarget(local_path, "rows/act/run.json")],
create_pr=True,
token="hf_test_token",
)
assert captured["repo_id"] == "lerobot/model-profiling-history"
assert captured["repo_type"] == "dataset"
assert captured["create_pr"] is True
assert result.pr_url == _FakeCommit.pr_url
assert result.uploaded_paths["rows/act/run.json"].endswith("/resolve/refs/pr/42/rows/act/run.json")
def test_parse_discussion_num_handles_hf_discussion_urls():
assert (
mp.parse_discussion_num(
"https://huggingface.co/datasets/lerobot/model-profiling-history/discussions/42"
)
== 42
)
assert mp.parse_discussion_num("https://huggingface.co/datasets/lerobot/model-profiling-history") is None
assert mp.parse_discussion_num(None) is None
# ---------------------------------------------------------------------------
# main() smoke tests
# ---------------------------------------------------------------------------
@pytest.fixture
def _fake_args(tmp_path):
"""Shared argparse namespace for main() smoke tests — overridden per-test."""
return argparse.Namespace(
policies=["act"],
output_dir=tmp_path / "results",
hub_org="lerobot",
results_repo="model-profiling-history",
publish=False,
profile_mode="summary",
git_commit="",
git_ref="codex/model-profiling",
pr_number="3389",
)
def _stub_train_subprocess(mp_module, *, returncode: int = 0, write_artifacts: bool = True):
"""Build a fake subprocess.run that writes the profiling artifacts main() expects."""
def _fake_run(cmd, capture_output, text):
assert capture_output is True
assert text is True
profile_dir = Path(next(a.split("=", 1)[1] for a in cmd if a.startswith("--profile_output_dir=")))
profile_dir.mkdir(parents=True, exist_ok=True)
if write_artifacts:
(profile_dir / "torch_tables").mkdir(parents=True, exist_ok=True)
(profile_dir / "step_timing_summary.json").write_text(
json.dumps({"total_update_s": {"count": 1, "mean": 0.3}, "peak_memory_allocated_bytes": 1024})
)
(profile_dir / "deterministic_forward.json").write_text(
json.dumps(
{"operator_fingerprint": "ops-fingerprint", "output_fingerprint": "output-fingerprint"}
)
)
(profile_dir / "torch_tables" / "cpu_time_total.txt").write_text("cpu time table")
return subprocess.CompletedProcess(cmd, returncode, "stdout ok", "")
return _fake_run
def test_main_smoke_writes_row(monkeypatch, _fake_args):
monkeypatch.setattr(mp, "parse_args", lambda: _fake_args)
monkeypatch.setattr(mp.subprocess, "check_output", lambda *a, **k: "deadbeef\n")
monkeypatch.setattr(mp.subprocess, "run", _stub_train_subprocess(mp))
assert mp.main() == 0
row_paths = list(_fake_args.output_dir.rglob("profiling_row.json"))
assert len(row_paths) == 1
row = json.loads(row_paths[0].read_text())
assert row["policy"] == "act"
assert row["status"] == "success"
assert row["git_commit"] == "deadbeef"
assert row["git_ref"] == "codex/model-profiling"
assert row["pr_number"] == 3389
assert row["step_timing_summary"]["total_update_s"]["mean"] == 0.3
assert row["deterministic_forward"]["operator_fingerprint"] == "ops-fingerprint"
def test_main_records_publish_failure_without_failing(monkeypatch, _fake_args):
_fake_args.publish = True
_fake_args.git_commit = "deadbeef"
monkeypatch.setattr(mp, "parse_args", lambda: _fake_args)
monkeypatch.setattr(mp.subprocess, "run", _stub_train_subprocess(mp, write_artifacts=False))
def _fail_upload(**kwargs):
resp = type("Resp", (), {"status_code": 403, "headers": {}, "request": None})()
raise HfHubHTTPError("403 Forbidden: Authorization error.", response=resp)
monkeypatch.setattr(mp, "upload_profile_run", _fail_upload)
assert mp.main() == 0
row = json.loads(next(_fake_args.output_dir.rglob("profiling_row.json")).read_text())
assert row["status"] == "success"
assert row["publish_status"] == "failed"
assert "Authorization error" in row["publish_error"]
# ---------------------------------------------------------------------------
# TrainingProfiler behavior
# ---------------------------------------------------------------------------
def test_deterministic_forward_artifacts_preserve_policy_mode(tmp_path):
class _TrainingOnlyPolicy(torch.nn.Module):
def __init__(self):
super().__init__()
self.forward_calls = 0
def forward(self, batch):
self.forward_calls += 1
assert self.training
return batch["value"].sum(), {"value": batch["value"]}
dataset = [{"value": torch.tensor([1.0, 2.0])}]
policy = _TrainingOnlyPolicy()
policy.train()
mp.write_deterministic_forward_artifacts(
policy=policy,
dataset=dataset,
batch_size=2,
preprocessor=lambda b: b,
output_dir=tmp_path,
device_type="cpu",
)
payload = json.loads((tmp_path / "deterministic_forward.json").read_text())
assert policy.training is True
assert policy.forward_calls == 1
assert payload["reference_batch_size"] == 2
assert "operator_fingerprint" in payload
assert payload["outputs"]["loss"]["numel"] == 1
def test_training_profiler_section_records_forward_backward_optimizer(tmp_path):
profiler = mp.TrainingProfiler(mode="summary", output_dir=tmp_path, device=torch.device("cpu"))
profiler.start()
for _ in range(3):
with profiler.section("forward"):
pass
with profiler.section("backward"):
pass
with profiler.section("optimizer"):
pass
profiler.step(1, argparse.Namespace(update_s=0.5, dataloading_s=0.01))
profiler.finalize()
payload = json.loads((tmp_path / "step_timing_summary.json").read_text())
assert payload["forward_s"]["count"] == 3
assert payload["backward_s"]["count"] == 3
assert payload["optimizer_s"]["count"] == 3
assert payload["total_update_s"]["mean"] == 0.5
def test_training_profiler_accepts_metric_like_values(tmp_path):
class _MetricLike:
def __init__(self, v):
self.val = v
profiler = mp.TrainingProfiler(mode="summary", output_dir=tmp_path, device=torch.device("cpu"))
profiler.start()
profiler.step(1, argparse.Namespace(update_s=_MetricLike(0.6), dataloading_s=_MetricLike(0.05)))
profiler.finalize()
payload = json.loads((tmp_path / "step_timing_summary.json").read_text())
assert payload["total_update_s"]["mean"] == 0.6
assert payload["dataloading_s"]["mean"] == 0.05
def test_profiler_device_time_uses_generic_attr_first():
class _Event:
self_device_time_total = 12.3456
assert mp._get_profiler_device_time_us(_Event()) == 12.3456