feat(rollout): adding legacy strategy

This commit is contained in:
Maximellerbach
2026-06-03 15:14:05 +02:00
parent 741c2d0a39
commit 350f8e8d4d
6 changed files with 340 additions and 2 deletions

View File

@@ -24,6 +24,7 @@ from .configs import (
DAggerPedalConfig,
DAggerStrategyConfig,
HighlightStrategyConfig,
LegacyStrategyConfig,
RolloutConfig,
RolloutStrategyConfig,
SentryStrategyConfig,
@@ -50,6 +51,7 @@ from .strategies import (
BaseStrategy,
DAggerStrategy,
HighlightStrategy,
LegacyStrategy,
RolloutStrategy,
SentryStrategy,
create_strategy,
@@ -66,6 +68,8 @@ __all__ = [
"HardwareContext",
"HighlightStrategy",
"HighlightStrategyConfig",
"LegacyStrategy",
"LegacyStrategyConfig",
"InferenceEngine",
"InferenceEngineConfig",
"PolicyContext",

View File

@@ -121,6 +121,24 @@ class DAggerPedalConfig:
upload: str = "KEY_C"
@RolloutStrategyConfig.register_subclass("legacy")
@dataclass
class LegacyStrategyConfig(RolloutStrategyConfig):
"""Episode-oriented recording that mirrors the pre-rollout lerobot-record behavior.
Records ``dataset.num_episodes`` episodes of maximum ``dataset.episode_time_s`` each.
After each episodes, runs ``dataset.reset_time_s`` seconds of reset time.
If --teleop.* is provided, teleoperation is used during the reset time; otherwise, the robot is idle during resets.
Keyboard controls (same as lerobot-record):
Right arrow — end episode early
Left arrow — discard current episode and re-record
Escape — stop recording session
"""
pass
@RolloutStrategyConfig.register_subclass("dagger")
@dataclass
class DAggerStrategyConfig(RolloutStrategyConfig):
@@ -229,7 +247,13 @@ class RolloutConfig:
# TODO(Steven): DAgger shouldn't require a dataset (user may want to just rollout+intervene without recording), but for now we require it to simplify the implementation.
needs_dataset = isinstance(
self.strategy, (SentryStrategyConfig, HighlightStrategyConfig, DAggerStrategyConfig)
self.strategy,
(
SentryStrategyConfig,
HighlightStrategyConfig,
DAggerStrategyConfig,
LegacyStrategyConfig,
),
)
if needs_dataset and (self.dataset is None or not self.dataset.repo_id):
raise ValueError(f"{self.strategy.type} strategy requires --dataset.repo_id to be set")

View File

@@ -19,6 +19,7 @@ from .core import RolloutStrategy, estimate_max_episode_seconds, safe_push_to_hu
from .dagger import DAggerEvents, DAggerPhase, DAggerStrategy
from .factory import create_strategy
from .highlight import HighlightStrategy
from .legacy import LegacyStrategy
from .sentry import SentryStrategy
__all__ = [
@@ -27,6 +28,7 @@ __all__ = [
"DAggerPhase",
"DAggerStrategy",
"HighlightStrategy",
"LegacyStrategy",
"RolloutStrategy",
"SentryStrategy",
"create_strategy",

View File

@@ -22,6 +22,7 @@ from .base import BaseStrategy
from .core import RolloutStrategy
from .dagger import DAggerStrategy
from .highlight import HighlightStrategy
from .legacy import LegacyStrategy
from .sentry import SentryStrategy
if TYPE_CHECKING:
@@ -42,4 +43,8 @@ def create_strategy(config: RolloutStrategyConfig) -> RolloutStrategy:
return HighlightStrategy(config)
if config.type == "dagger":
return DAggerStrategy(config)
raise ValueError(f"Unknown strategy type '{config.type}'. Available: base, sentry, highlight, dagger")
if config.type == "legacy":
return LegacyStrategy(config)
raise ValueError(
f"Unknown strategy type '{config.type}'. Available: base, sentry, highlight, dagger, legacy"
)

View File

@@ -0,0 +1,291 @@
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Legacy rollout strategy: policy-driven multi-episode recording with reset phases.
Mirrors the pre-rollout ``lerobot-record`` inference path:
- Policy drives the robot during each recording episode.
- An optional teleoperator can drive the robot during reset phases so the
operator can bring the environment back to its starting configuration.
If no teleop is connected the robot stays in its current position.
- Keyboard controls (identical to the old lerobot-record):
Right arrow — end the current episode early
Left arrow — discard the current episode and re-record it
Escape — stop the recording session
Dataset naming follows the old lerobot-record convention: any name is
accepted except those starting with ``eval_`` (reserved for evaluation).
"""
from __future__ import annotations
import logging
import time
from lerobot.common.control_utils import init_keyboard_listener, is_headless
from lerobot.datasets import VideoEncodingManager
from lerobot.utils.constants import ACTION, OBS_STR
from lerobot.utils.feature_utils import build_dataset_frame
from lerobot.utils.robot_utils import precise_sleep
from lerobot.utils.utils import log_say
from lerobot.utils.visualization_utils import log_rerun_data
from ..configs import LegacyStrategyConfig
from ..context import RolloutContext
from .core import RolloutStrategy, safe_push_to_hub, send_next_action
logger = logging.getLogger(__name__)
class LegacyStrategy(RolloutStrategy):
"""Policy-driven multi-episode recording (mirrors old lerobot-record inference path).
Each recording episode runs the policy for maximum ``dataset.episode_time_s``
seconds, recording every frame. A reset phase of ``dataset.reset_time_s``
follows every episode (except the last) so the operator can manually
reset the environment. During the reset phase, an optional teleoperator
drives the robot; if none is present the robot holds its position.
The policy state (hidden state, RTC queue, interpolator) is reset at
the start of each recording episode.
Keyboard events:
right arrow → exit current episode early
left arrow → discard & re-record current episode
ESC → stop the session
"""
config: LegacyStrategyConfig
def __init__(self, config: LegacyStrategyConfig) -> None:
super().__init__(config)
self._listener = None
self._events: dict | None = None
def setup(self, ctx: RolloutContext) -> None:
"""Start the inference engine and attach the keyboard listener."""
self._init_engine(ctx)
self._listener, self._events = init_keyboard_listener()
logger.info("Legacy strategy ready")
def run(self, ctx: RolloutContext) -> None:
"""Main multi-episode recording loop."""
cfg = ctx.runtime.cfg
dataset_cfg = cfg.dataset
robot = ctx.hardware.robot_wrapper
teleop = ctx.hardware.teleop
dataset = ctx.data.dataset
events = self._events
features = ctx.data.dataset_features
fps = cfg.fps
episode_time_s = dataset_cfg.episode_time_s
reset_time_s = dataset_cfg.reset_time_s
num_episodes = dataset_cfg.num_episodes
single_task = dataset_cfg.single_task or cfg.task
play_sounds = cfg.play_sounds
display_compressed = (
True
if (cfg.display_data and cfg.display_ip is not None and cfg.display_port is not None)
else cfg.display_compressed_images
)
with VideoEncodingManager(dataset):
recorded_episodes = 0
while recorded_episodes < num_episodes and not events["stop_recording"]:
if ctx.runtime.shutdown_event.is_set():
break
# Reset policy state at episode start (discard leftover hidden state / queue)
self._engine.reset()
self._interpolator.reset()
self._engine.resume()
log_say(f"Recording episode {dataset.num_episodes}", play_sounds)
self._policy_loop(
ctx=ctx,
robot=robot,
events=events,
features=features,
fps=fps,
control_time_s=episode_time_s,
dataset=dataset,
single_task=single_task,
)
# Reset phase, skip after the last episode (but run when re-recording)
if not events["stop_recording"] and (
recorded_episodes < num_episodes - 1 or events["rerecord_episode"]
):
log_say("Reset the environment", play_sounds)
self._reset_loop(
ctx=ctx,
robot=robot,
teleop=teleop,
events=events,
fps=fps,
control_time_s=reset_time_s,
display_data=cfg.display_data,
display_compressed=display_compressed,
)
if events["rerecord_episode"]:
log_say("Re-record episode", play_sounds)
events["rerecord_episode"] = False
events["exit_early"] = False
dataset.clear_episode_buffer()
continue
dataset.save_episode()
recorded_episodes += 1
def _policy_loop(
self,
ctx: RolloutContext,
robot,
events: dict,
features: dict,
fps: float,
control_time_s: float,
dataset,
single_task: str,
) -> None:
"""Policy-driven recording loop for a single episode."""
interpolator = self._interpolator
control_interval = interpolator.get_control_interval(fps)
timestamp = 0.0
start_t = time.perf_counter()
while timestamp < control_time_s:
loop_start = time.perf_counter()
if events["exit_early"]:
events["exit_early"] = False
break
if ctx.runtime.shutdown_event.is_set():
break
obs = robot.get_observation()
obs_processed = self._process_observation_and_notify(ctx.processors, obs)
if self._handle_warmup(ctx.runtime.cfg.use_torch_compile, loop_start, control_interval):
continue
action_dict = send_next_action(obs_processed, obs, ctx, interpolator)
if action_dict is not None:
obs_frame = build_dataset_frame(features, obs_processed, prefix=OBS_STR)
action_frame = build_dataset_frame(features, action_dict, prefix=ACTION)
dataset.add_frame({**obs_frame, **action_frame, "task": single_task})
self._log_telemetry(obs_processed, action_dict, ctx.runtime)
dt = time.perf_counter() - loop_start
sleep_t = control_interval - dt
if sleep_t < 0:
logger.warning(
f"Record loop is running slower ({1 / dt:.1f} Hz) than the target FPS ({fps} Hz). "
"Dataset frames might be dropped and robot control might be unstable. "
"Common causes are: 1) Camera FPS not keeping up 2) Policy inference taking too long "
"3) CPU starvation"
)
precise_sleep(max(sleep_t, 0.0))
timestamp = time.perf_counter() - start_t
def _reset_loop(
self,
ctx: RolloutContext,
robot,
teleop,
events: dict,
fps: float,
control_time_s: float,
display_data: bool,
display_compressed: bool,
) -> None:
"""Reset-phase loop: teleop drives the robot if available, no recording."""
processors = ctx.processors
control_interval = 1.0 / fps
timestamp = 0.0
start_t = time.perf_counter()
while timestamp < control_time_s:
loop_start = time.perf_counter()
if events["exit_early"]:
events["exit_early"] = False
break
if ctx.runtime.shutdown_event.is_set():
break
obs = robot.get_observation()
if teleop is not None:
act = teleop.get_action()
act_teleop = processors.teleop_action_processor((act, obs))
robot_action = processors.robot_action_processor((act_teleop, obs))
robot.send_action(robot_action)
if display_data:
obs_processed = processors.robot_observation_processor(obs)
log_rerun_data(
observation=obs_processed,
action=act_teleop,
compress_images=display_compressed,
)
dt = time.perf_counter() - loop_start
sleep_t = control_interval - dt
precise_sleep(max(sleep_t, 0.0))
timestamp = time.perf_counter() - start_t
def teardown(self, ctx: RolloutContext) -> None:
"""Finalise dataset, stop listener, push to hub, and disconnect hardware."""
cfg = ctx.runtime.cfg
play_sounds = cfg.play_sounds
log_say("Stop recording", play_sounds, blocking=True)
if not is_headless() and self._listener is not None:
self._listener.stop()
if ctx.data.dataset is not None:
logger.info("Finalizing dataset...")
ctx.data.dataset.finalize()
if (
cfg.dataset is not None
and cfg.dataset.push_to_hub
and ctx.data.dataset is not None
and safe_push_to_hub(
ctx.data.dataset,
tags=cfg.dataset.tags,
private=cfg.dataset.private,
)
):
logger.info("Dataset uploaded to hub")
log_say("Dataset uploaded to hub", play_sounds)
self._teardown_hardware(
ctx.hardware,
return_to_initial_position=cfg.return_to_initial_position,
)
log_say("Exiting", play_sounds)
logger.info("Legacy strategy teardown complete")

View File

@@ -25,6 +25,7 @@ Strategies
--strategy.type=sentry Continuous recording with auto-upload
--strategy.type=highlight Ring buffer + keystroke save
--strategy.type=dagger Human-in-the-loop (DAgger / RaC)
--strategy.type=legacy Episode oriented recording, mirrors old lerobot-record
Inference backends
------------------
@@ -111,6 +112,17 @@ Usage examples
--display_data=true \\
--use_torch_compile=true
# Legacy mode — episode-oriented recording, mirrors old lerobot-record
lerobot-rollout \\
--strategy.type=legacy \\
--robot.type=so100_follower \\
--robot.port=/dev/ttyACM0 \\
--teleop.type=so100_leader \\
--teleop.port=/dev/ttyACM1 \\
--dataset.repo_id=user/grab_cube \\
--dataset.num_episodes=20 \\
--dataset.single_task="Grab the cube"
# Resume a previous sentry recording session
lerobot-rollout \\
--strategy.type=sentry \\