diff --git a/src/lerobot/scripts/lerobot_eval.py b/src/lerobot/scripts/lerobot_eval.py index d1dd6803c..1cb909580 100644 --- a/src/lerobot/scripts/lerobot_eval.py +++ b/src/lerobot/scripts/lerobot_eval.py @@ -47,6 +47,7 @@ You can learn about the CLI options for this script in the `EvalPipelineConfig` """ import concurrent.futures as cf +import copy import json import logging import threading @@ -56,7 +57,6 @@ from collections.abc import Callable from contextlib import nullcontext from copy import deepcopy from dataclasses import asdict -from functools import partial from pathlib import Path from pprint import pformat from typing import Any, TypedDict @@ -733,34 +733,48 @@ def eval_policy_all( group_acc[group]["video_paths"].extend(paths) overall["video_paths"].extend(paths) + def _make_thread_policy(p: PreTrainedPolicy) -> PreTrainedPolicy: + """Shallow copy sharing weight tensors, with independent per-thread state. + + copy.copy() gives a new Python object whose _parameters dict is a shared + reference (same tensor storage, zero extra VRAM). reset() then rebinds + mutable state (action queues etc.) to fresh per-thread objects. + + Note: does NOT work for ACT with temporal_ensemble_coeff — that policy's + reset() mutates a shared sub-object. Use max_parallel_tasks=1 for that config. + """ + thread_p = copy.copy(p) + thread_p.reset() + return thread_p + # Choose runner (sequential vs threaded) - task_runner = partial( - run_one, - policy=policy, - env_preprocessor=env_preprocessor, - env_postprocessor=env_postprocessor, - preprocessor=preprocessor, - postprocessor=postprocessor, - n_episodes=n_episodes, - max_episodes_rendered=max_episodes_rendered, - videos_dir=videos_dir, - return_episode_data=return_episode_data, - start_seed=start_seed, - ) + _runner_kwargs = { + "env_preprocessor": env_preprocessor, + "env_postprocessor": env_postprocessor, + "preprocessor": preprocessor, + "postprocessor": postprocessor, + "n_episodes": n_episodes, + "max_episodes_rendered": max_episodes_rendered, + "videos_dir": videos_dir, + "return_episode_data": return_episode_data, + "start_seed": start_seed, + } if max_parallel_tasks <= 1: # sequential path (single accumulator path on the main thread) # NOTE: keeping a single-threaded accumulator avoids concurrent list appends or locks for task_group, task_id, env in tasks: - tg, tid, metrics = task_runner(task_group, task_id, env) + tg, tid, metrics = run_one(task_group, task_id, env, policy=policy, **_runner_kwargs) _accumulate_to(tg, metrics) per_task_infos.append({"task_group": tg, "task_id": tid, "metrics": metrics}) else: - # threaded path: submit all tasks, consume completions on main thread and accumulate there + # threaded path: each thread gets a shallow policy copy (shared weights, independent state) with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor: fut2meta = {} for task_group, task_id, env in tasks: - fut = executor.submit(task_runner, task_group, task_id, env) + fut = executor.submit( + run_one, task_group, task_id, env, policy=_make_thread_policy(policy), **_runner_kwargs + ) fut2meta[fut] = (task_group, task_id) for fut in cf.as_completed(fut2meta): tg, tid, metrics = fut.result()