From 1de2a4a8289b1ef7cb17ea5089641b64c855b0d2 Mon Sep 17 00:00:00 2001 From: Francesco Capuano Date: Thu, 18 Sep 2025 17:28:14 +0200 Subject: [PATCH] add: profiler for streaming performance --- src/lerobot/scripts/profile_streaming.py | 234 +++++++++++++++++++++++ 1 file changed, 234 insertions(+) create mode 100644 src/lerobot/scripts/profile_streaming.py diff --git a/src/lerobot/scripts/profile_streaming.py b/src/lerobot/scripts/profile_streaming.py new file mode 100644 index 000000000..6b0e7b1d7 --- /dev/null +++ b/src/lerobot/scripts/profile_streaming.py @@ -0,0 +1,234 @@ +import argparse +import datetime +import os +import time +from collections.abc import Callable + +import numpy as np +import pandas as pd +from tqdm import tqdm + +from lerobot.datasets.lerobot_dataset import LeRobotDataset +from lerobot.datasets.streaming_dataset import StreamingLeRobotDataset + + +def profile_throughput_indexed( + dataset: LeRobotDataset, num_samples: int, warmup_iters: int = 3 +) -> np.ndarray: + """Measure per-item access time on an indexable LeRobotDataset. + + Accesses dataset[i % len(dataset)] for ``num_samples`` iterations, with an initial warmup. + """ + next_times = np.zeros(num_samples) + total = len(dataset) + + # warmup + for k in range(warmup_iters): + _ = dataset[k % total] + + for j in tqdm(range(num_samples), desc="Profiling dataset throughput", unit="item"): + start_time = time.perf_counter() + _ = dataset[j % total] + end_time = time.perf_counter() + next_times[j] = end_time - start_time + + return next_times + + +def profile_throughput( + dataset: StreamingLeRobotDataset, num_samples: int, warmup_iters: int = 3 +) -> np.ndarray: + """Measure ``.next()`` call latency on a streaming dataset. + + Performs a configurable warmup. This does not numerically "normalize" times; it simply + avoids including initialization overhead in the timing window. + """ + next_times = np.zeros(num_samples) + iter_dataset = iter(dataset) + + # warmup + for _ in range(warmup_iters): + _ = next(iter_dataset) + + for j in tqdm(range(num_samples), desc="Profiling throughput", unit="call"): + start_time = time.perf_counter() + _sample = next(iter_dataset) + end_time = time.perf_counter() + next_times[j] = end_time - start_time + + return next_times + + +def profile_init(dataset_factory: Callable[[], StreamingLeRobotDataset], num_runs: int) -> np.ndarray: + """Measure time-to-first-sample by re-instantiating the dataset ``num_runs`` times. + + Using a factory avoids unsafe ``deepcopy`` of objects that may own threads or file handles. + """ + init_times = np.zeros(num_runs) + for i in tqdm(range(num_runs), desc="Profiling init", unit="run"): + fresh_dataset = dataset_factory() + iter_dataset = iter(fresh_dataset) + start_time = time.perf_counter() + _ = next(iter_dataset) + end_time = time.perf_counter() + init_times[i] = end_time - start_time + + return init_times + + +def profile_randomness(dataset: StreamingLeRobotDataset, num_samples: int) -> float: + """Measure how random the sample order is via correlation. + + Returns a Pearson correlation between retrieved frame index and iteration index. + - ~0: random order + - ~+1: strictly increasing (in-order) + - ~-1: strictly decreasing (reverse order) + """ + frame_indices = np.zeros(num_samples, dtype=float) + iter_indices = np.arange(num_samples, dtype=float) + + iter_dataset = iter(dataset) + + for i in tqdm(range(num_samples), desc="Profiling randomness", unit="sample"): + sample = next(iter_dataset) + if "index" in sample: + frame_idx_value = sample["index"] + elif "frame_index" in sample: + frame_idx_value = sample["frame_index"] + else: + raise KeyError("Sample is missing 'index' (or 'frame_index') required to compute randomness.") + frame_indices[i] = float(frame_idx_value) + + # Guard against degenerate cases + if num_samples < 2 or np.std(frame_indices) == 0 or np.std(iter_indices) == 0: + return np.nan, None + + correlation = float(np.corrcoef(frame_indices, iter_indices)[0, 1]) + return correlation + + +def profile_streaming_dataset( + repo_id: str, + delta_timestamps: dict[str, list[float]] | None = None, + num_samples: int = 100, + warmup_iters: int = 10, + buffer_size: int = 1000, +) -> tuple[np.ndarray, np.ndarray, float]: + """Run init, throughput, and randomness profiles on a StreamingLeRobotDataset.""" + + def dataset_factory() -> StreamingLeRobotDataset: + return StreamingLeRobotDataset(repo_id, delta_timestamps=delta_timestamps, buffer_size=buffer_size) + + # Measure init by repeated instantiation + init_times = profile_init(dataset_factory, num_runs=warmup_iters) + + # Throughput and randomness on a single fresh dataset instance + dataset = dataset_factory() + next_times = profile_throughput(dataset, num_samples=num_samples, warmup_iters=warmup_iters) + correlation = profile_randomness(dataset, num_samples=num_samples) + + return init_times, next_times, correlation + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Profile StreamingLeRobotDataset performance metrics.") + parser.add_argument( + "--repo-id", + type=str, + default="lerobot/svla_so101_pickplace", + help="Dataset repo_id to profile.", + ) + parser.add_argument( + "--num-samples", + type=int, + default=1000, + help="Number of samples to measure for throughput/randomness.", + ) + parser.add_argument( + "--warmup-iters", + type=int, + default=10, + help="Number of iterations for init and throughput warmup.", + ) + parser.add_argument( + "--buffer-size", + type=int, + default=1000, + help="Buffer size for the streaming dataset.", + ) + parser.add_argument( + "--with-delta-timestamps", + action="store_true", + help="Profile with delta timestamps.", + ) + parser.add_argument( + "--compare-with-local", + action="store_true", + help="Also profile local LeRobotDataset throughput for comparison.", + ) + parser.add_argument( + "--outdir", + type=str, + default=os.path.join("outputs", "benchmarks"), + help="Directory to write CSVs/PNGs to.", + ) + + args = parser.parse_args() + + delta_timestamps = ( + None + if not args.with_delta_timestamps + else { + "observation.state": [-2.0, -1.0, -0.5, 0.0, 0.5, 1.0], + "action": [ + -0.1, + 0.0, + 0.1, + 0.2, + 0.3, + 0.4, + 0.5, + 0.6, + 0.7, + 0.8, + 0.9, + 1.0, + ], + } + ) + + init_times, next_times, correlation = profile_streaming_dataset( + repo_id=args.repo_id, + delta_timestamps=delta_timestamps, + num_samples=args.num_samples, + warmup_iters=args.warmup_iters, + buffer_size=args.buffer_size, + ) + + os.makedirs(args.outdir, exist_ok=True) + + repo_id_str = args.repo_id.replace("/", "-") + date_str = datetime.datetime.now().strftime("%Y-%m-%d") + name_suffix = f"{repo_id_str}_buf{args.buffer_size}_{date_str}" + + # Visualization disabled by default; figures are not created or saved. + + init_df = pd.DataFrame({"init_times": init_times}) + next_df = pd.DataFrame({"next_times": next_times}) + correlation_df = pd.DataFrame({"correlation": [correlation]}) + + init_df.to_csv(os.path.join(args.outdir, f"init_times_{name_suffix}.csv"), index=False) + next_df.to_csv(os.path.join(args.outdir, f"next_times_{name_suffix}.csv"), index=False) + correlation_df.to_csv(os.path.join(args.outdir, f"correlation_{name_suffix}.csv"), index=False) + + if args.compare_with_local: + # Profile local non-streaming dataset throughput for comparison + local_ds = LeRobotDataset(args.repo_id, delta_timestamps=delta_timestamps) + local_next_times = profile_throughput_indexed( + local_ds, num_samples=args.num_samples, warmup_iters=args.warmup_iters + ) + local_df = pd.DataFrame({"next_times": local_next_times}) + local_df.to_csv( + os.path.join(args.outdir, f"next_times_local_{repo_id_str}_{date_str}.csv"), + index=False, + )