mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-02 11:51:25 +00:00
fix(eval): skip multi-instance orchestration when runtime=docker
_orchestrate_multi_instance_eval spawns extra lerobot-eval processes that each call run_eval_in_docker again, creating N^2 containers. For docker runtime, instance_count directly controls how many env-worker containers are spawned by run_eval_in_docker — no process-level orchestration is needed. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -74,9 +74,9 @@ from tqdm import trange
|
||||
|
||||
from lerobot.configs import parser
|
||||
from lerobot.configs.eval import EvalPipelineConfig
|
||||
from lerobot.envs.lazy_vec_env import LazyVectorEnv
|
||||
from lerobot.envs.docker_runtime import run_eval_in_docker
|
||||
from lerobot.envs.factory import make_env, make_env_pre_post_processors
|
||||
from lerobot.envs.lazy_vec_env import LazyVectorEnv
|
||||
from lerobot.envs.utils import (
|
||||
add_envs_task,
|
||||
check_env_attributes_and_types,
|
||||
@@ -87,14 +87,14 @@ from lerobot.policies.factory import make_policy, make_pre_post_processors
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy
|
||||
from lerobot.processor import PolicyAction, PolicyProcessorPipeline
|
||||
from lerobot.utils.constants import ACTION, DONE, OBS_STR, REWARD
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.io_utils import write_video
|
||||
from lerobot.utils.random_utils import set_seed
|
||||
from lerobot.utils.hf_eval_results import (
|
||||
build_eval_results_rows,
|
||||
default_eval_date,
|
||||
upload_eval_results_yaml,
|
||||
)
|
||||
from lerobot.utils.import_utils import register_third_party_plugins
|
||||
from lerobot.utils.io_utils import write_video
|
||||
from lerobot.utils.random_utils import set_seed
|
||||
from lerobot.utils.utils import (
|
||||
get_safe_torch_device,
|
||||
init_logging,
|
||||
@@ -613,7 +613,10 @@ def push_eval_to_hub(
|
||||
@parser.wrap()
|
||||
def eval_main(cfg: EvalPipelineConfig):
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
if cfg.eval.instance_count > 1 and cfg.eval.instance_id == 0:
|
||||
# Multi-instance orchestration only applies to local runtime.
|
||||
# For docker runtime, instance_count controls the number of env containers
|
||||
# spawned directly by run_eval_in_docker — no extra lerobot-eval processes needed.
|
||||
if cfg.eval.runtime == "local" and cfg.eval.instance_count > 1 and cfg.eval.instance_id == 0:
|
||||
_orchestrate_multi_instance_eval(cfg)
|
||||
else:
|
||||
_run_eval_worker(cfg)
|
||||
@@ -720,7 +723,10 @@ def _run_eval_worker(cfg: EvalPipelineConfig) -> dict:
|
||||
env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env, policy_cfg=cfg.policy)
|
||||
|
||||
try:
|
||||
with torch.no_grad(), torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext():
|
||||
with (
|
||||
torch.no_grad(),
|
||||
torch.autocast(device_type=device.type) if cfg.policy.use_amp else nullcontext(),
|
||||
):
|
||||
info = eval_policy_all(
|
||||
envs=envs,
|
||||
policy=policy,
|
||||
@@ -1003,16 +1009,18 @@ def _eval_task_batch(
|
||||
|
||||
results: list[tuple[str, int, TaskMetrics]] = []
|
||||
for tg, tid, start_i, end_i in task_slices:
|
||||
results.append((
|
||||
tg,
|
||||
tid,
|
||||
TaskMetrics(
|
||||
sum_rewards=batch_sum_rewards[start_i:end_i].tolist(),
|
||||
max_rewards=batch_max_rewards[start_i:end_i].tolist(),
|
||||
successes=batch_successes[start_i:end_i].tolist(),
|
||||
video_paths=video_paths_per_task.get((tg, tid), []),
|
||||
),
|
||||
))
|
||||
results.append(
|
||||
(
|
||||
tg,
|
||||
tid,
|
||||
TaskMetrics(
|
||||
sum_rewards=batch_sum_rewards[start_i:end_i].tolist(),
|
||||
max_rewards=batch_max_rewards[start_i:end_i].tolist(),
|
||||
successes=batch_successes[start_i:end_i].tolist(),
|
||||
video_paths=video_paths_per_task.get((tg, tid), []),
|
||||
),
|
||||
)
|
||||
)
|
||||
return results
|
||||
finally:
|
||||
combined_env.close()
|
||||
@@ -1137,8 +1145,7 @@ def eval_policy_all(
|
||||
total_tasks = len(tasks)
|
||||
tasks = [task for idx, task in enumerate(tasks) if idx % instance_count == instance_id]
|
||||
logging.info(
|
||||
f"Instance shard {instance_id + 1}/{instance_count}: "
|
||||
f"{len(tasks)}/{total_tasks} tasks assigned."
|
||||
f"Instance shard {instance_id + 1}/{instance_count}: {len(tasks)}/{total_tasks} tasks assigned."
|
||||
)
|
||||
|
||||
# accumulators: track metrics at both per-group level and across all groups
|
||||
@@ -1236,13 +1243,9 @@ def eval_policy_all(
|
||||
else:
|
||||
# Threaded fallback for cases where batched lazy mode cannot be used.
|
||||
if all_lazy and n_episodes != 1:
|
||||
logging.info(
|
||||
"Task scheduler mode: threaded (lazy batching disabled because n_episodes != 1)"
|
||||
)
|
||||
logging.info("Task scheduler mode: threaded (lazy batching disabled because n_episodes != 1)")
|
||||
elif all_lazy and not single_factory_per_task:
|
||||
logging.info(
|
||||
"Task scheduler mode: threaded (lazy batching disabled because eval.batch_size > 1)"
|
||||
)
|
||||
logging.info("Task scheduler mode: threaded (lazy batching disabled because eval.batch_size > 1)")
|
||||
else:
|
||||
logging.info(f"Task scheduler mode: threaded (max_workers={max_parallel_tasks})")
|
||||
with cf.ThreadPoolExecutor(max_workers=max_parallel_tasks) as executor:
|
||||
|
||||
Reference in New Issue
Block a user