mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-31 10:51:35 +00:00
Compare commits
9 Commits
feat/decou
...
feat/lerob
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
475b8da7d6 | ||
|
|
c16ac80063 | ||
|
|
2f690fe4f1 | ||
|
|
52703ecf95 | ||
|
|
195b777367 | ||
|
|
b49e4016f2 | ||
|
|
02d8a34829 | ||
|
|
14c7a25ce4 | ||
|
|
bc06cb44ca |
@@ -274,7 +274,8 @@ python src/lerobot/scripts/lerobot_train.py \
|
||||
Once trained, we recommend deploying policies using inference-time RTC:
|
||||
|
||||
```bash
|
||||
python examples/rtc/eval_with_real_robot.py \
|
||||
lerobot-rollout \
|
||||
--strategy.type=base \
|
||||
--policy.path=your-username/your-repo-id \
|
||||
--policy.device=cuda \
|
||||
--robot.type=unitree_g1 \
|
||||
|
||||
@@ -23,7 +23,6 @@ from .configs import (
|
||||
DAggerKeyboardConfig,
|
||||
DAggerPedalConfig,
|
||||
DAggerStrategyConfig,
|
||||
DatasetRecordConfig,
|
||||
HighlightStrategyConfig,
|
||||
RolloutConfig,
|
||||
RolloutStrategyConfig,
|
||||
@@ -57,7 +56,6 @@ __all__ = [
|
||||
"DAggerPedalConfig",
|
||||
"DAggerStrategyConfig",
|
||||
"DatasetContext",
|
||||
"DatasetRecordConfig",
|
||||
"HardwareContext",
|
||||
"HighlightStrategyConfig",
|
||||
"InferenceEngine",
|
||||
|
||||
@@ -142,7 +142,9 @@ class DAggerStrategyConfig(RolloutStrategyConfig):
|
||||
windows, where each correction becomes its own episode.
|
||||
"""
|
||||
|
||||
num_episodes: int = 10
|
||||
# Number of correction episodes to collect (corrections-only mode).
|
||||
# When None, falls back to ``--dataset.num_episodes``.
|
||||
num_episodes: int | None = None
|
||||
record_autonomous: bool = False
|
||||
upload_every_n_episodes: int = 5
|
||||
# Target video file size in MB for episode rotation (record_autonomous
|
||||
@@ -216,14 +218,17 @@ class RolloutConfig:
|
||||
if isinstance(self.strategy, DAggerStrategyConfig) and self.teleop is None:
|
||||
raise ValueError("DAgger strategy requires --teleop.type to be set")
|
||||
|
||||
needs_dataset = isinstance(self.strategy, (SentryStrategyConfig, HighlightStrategyConfig))
|
||||
# TODO(Steven): DAgger shouldn't require a dataset (user may want to just rollout+intervene without recording), but for now we require it to simplify the implementation.
|
||||
needs_dataset = isinstance(
|
||||
self.strategy, (SentryStrategyConfig, HighlightStrategyConfig, DAggerStrategyConfig)
|
||||
)
|
||||
if needs_dataset and (self.dataset is None or not self.dataset.repo_id):
|
||||
raise ValueError(f"{self.strategy.type} strategy requires --dataset.repo_id to be set")
|
||||
|
||||
if isinstance(self.strategy, BaseStrategyConfig) and self.dataset is not None:
|
||||
raise ValueError(
|
||||
"Base strategy does not record data. Use sentry, highlight, or dagger for recording."
|
||||
)
|
||||
# if isinstance(self.strategy, BaseStrategyConfig) and self.dataset is not None:
|
||||
# raise ValueError(
|
||||
# "Base strategy does not record data. Use sentry, highlight, or dagger for recording."
|
||||
# )
|
||||
|
||||
# Sentry MUST use streaming encoding to avoid disk I/O blocking the control loop
|
||||
if (
|
||||
@@ -244,14 +249,29 @@ class RolloutConfig:
|
||||
self.dataset.streaming_encoding = True
|
||||
|
||||
# DAgger: streaming is mandatory only when the autonomous phase is also recorded.
|
||||
if (
|
||||
isinstance(self.strategy, DAggerStrategyConfig)
|
||||
and self.strategy.record_autonomous
|
||||
and self.dataset is not None
|
||||
and not self.dataset.streaming_encoding
|
||||
):
|
||||
logger.warning("DAgger with record_autonomous=True forces streaming_encoding=True")
|
||||
self.dataset.streaming_encoding = True
|
||||
if isinstance(self.strategy, DAggerStrategyConfig) and self.dataset is not None:
|
||||
if self.strategy.record_autonomous and not self.dataset.streaming_encoding:
|
||||
logger.warning("DAgger with record_autonomous=True forces streaming_encoding=True")
|
||||
self.dataset.streaming_encoding = True
|
||||
elif not self.strategy.record_autonomous and not self.dataset.streaming_encoding:
|
||||
logger.info(
|
||||
"Streaming encoding is disabled for DAgger corrections-only mode. "
|
||||
"Consider enabling it for faster episode saving: "
|
||||
"--dataset.streaming_encoding=true --dataset.encoder_threads=2"
|
||||
)
|
||||
|
||||
# DAgger: resolve num_episodes from dataset config when not explicitly set.
|
||||
if isinstance(self.strategy, DAggerStrategyConfig) and self.strategy.num_episodes is None:
|
||||
if self.dataset is not None:
|
||||
self.strategy.num_episodes = self.dataset.num_episodes
|
||||
logger.info(
|
||||
"DAgger num_episodes not set — using --dataset.num_episodes=%d",
|
||||
self.strategy.num_episodes,
|
||||
)
|
||||
else:
|
||||
raise ValueError(
|
||||
"DAgger num_episodes must be set either via --strategy.num_episodes or --dataset.num_episodes"
|
||||
)
|
||||
|
||||
# --- Policy loading ---
|
||||
if self.robot is None:
|
||||
@@ -265,6 +285,26 @@ class RolloutConfig:
|
||||
if self.policy is None:
|
||||
raise ValueError("--policy.path is required for rollout")
|
||||
|
||||
# --- Task resolution ---
|
||||
# When --dataset.rename_map (or any --dataset.* flag) is passed, draccus
|
||||
# creates a DatasetRecordConfig with single_task="". If the user set
|
||||
# the task via the top-level --task flag, propagate it so that all
|
||||
# downstream consumers (inference engine, dataset frame builders) see it.
|
||||
if self.dataset is not None and not self.dataset.single_task and self.task:
|
||||
self.dataset.single_task = self.task
|
||||
elif self.dataset is not None and self.dataset.single_task and not self.task:
|
||||
self.task = self.dataset.single_task
|
||||
|
||||
# --- Device resolution ---
|
||||
# Resolve device from the policy config when not explicitly set so all
|
||||
# components (policy.to, preprocessor, inference engine) use the same
|
||||
# device string instead of inconsistent fallbacks.
|
||||
if self.device is None and self.policy is not None:
|
||||
resolved = getattr(self.policy, "device", None)
|
||||
if resolved:
|
||||
self.device = resolved
|
||||
logger.info("Resolved device from policy config: %s", self.device)
|
||||
|
||||
@classmethod
|
||||
def __get_path_fields__(cls) -> list[str]:
|
||||
return ["policy"]
|
||||
|
||||
@@ -272,11 +272,17 @@ def build_rollout_context(
|
||||
# )
|
||||
|
||||
# --- 4. Features + action-key reconciliation ---------------------
|
||||
# Only `.pos` joint features are used for policy inference — velocity and
|
||||
# torque channels are observation-only and must be excluded from the state
|
||||
# and action tensors that the policy sees. This matches the filtering
|
||||
# applied by the old ``hil_data_collection`` script.
|
||||
all_obs_features = robot.observation_features
|
||||
observation_features_hw = {
|
||||
k: v for k, v in all_obs_features.items() if v is float or isinstance(v, tuple)
|
||||
k: v
|
||||
for k, v in all_obs_features.items()
|
||||
if isinstance(v, tuple) or (v is float and k.endswith(".pos"))
|
||||
}
|
||||
action_features_hw = robot.action_features
|
||||
action_features_hw = {k: v for k, v in robot.action_features.items() if k.endswith(".pos")}
|
||||
|
||||
# The action side is always needed: sync inference reads action names from
|
||||
# ``dataset_features[ACTION]`` to map policy tensors back to robot actions.
|
||||
@@ -293,19 +299,56 @@ def build_rollout_context(
|
||||
)
|
||||
dataset_features = combine_feature_dicts(action_dataset_features, observation_dataset_features)
|
||||
hw_features = hw_to_dataset_features(observation_features_hw, "observation")
|
||||
raw_action_keys = list(robot.action_features.keys())
|
||||
raw_action_keys = list(action_features_hw.keys())
|
||||
policy_action_names = getattr(policy_config, "action_feature_names", None)
|
||||
ordered_action_keys = _resolve_action_key_order(
|
||||
list(policy_action_names) if policy_action_names else None,
|
||||
raw_action_keys,
|
||||
)
|
||||
|
||||
# --- Diagnostic logging ---
|
||||
_act_ft = dataset_features.get("action", {})
|
||||
_obs_ft = dataset_features.get("observation.state", {})
|
||||
logger.info(
|
||||
"Feature reconciliation: action_dim=%d, obs_state_dim=%d, ordered_action_keys=%d",
|
||||
_act_ft.get("shape", (0,))[0],
|
||||
_obs_ft.get("shape", (0,))[0],
|
||||
len(ordered_action_keys),
|
||||
)
|
||||
logger.info(" action names : %s", _act_ft.get("names", []))
|
||||
logger.info(" obs state names: %s", _obs_ft.get("names", []))
|
||||
logger.info(" ordered keys : %s", ordered_action_keys)
|
||||
logger.info(
|
||||
" policy.action_feature_names: %s",
|
||||
list(policy_action_names) if policy_action_names else "None (using raw_action_keys)",
|
||||
)
|
||||
if full_config.input_features:
|
||||
logger.info(" policy input_features: %s", list(full_config.input_features.keys()))
|
||||
else:
|
||||
logger.warning(" policy input_features is EMPTY — policy may not process images!")
|
||||
if full_config.output_features:
|
||||
for k, v in full_config.output_features.items():
|
||||
logger.info(" policy output_features[%s]: shape=%s", k, v.shape)
|
||||
# Validate action dimension consistency
|
||||
if full_config.output_features:
|
||||
for ft in full_config.output_features.values():
|
||||
policy_action_dim = ft.shape[0]
|
||||
if len(ordered_action_keys) != policy_action_dim:
|
||||
logger.error(
|
||||
"ACTION DIM MISMATCH: policy expects %d dims, hardware produces %d keys. "
|
||||
"First 5 keys: %s",
|
||||
policy_action_dim,
|
||||
len(ordered_action_keys),
|
||||
ordered_action_keys[:5],
|
||||
)
|
||||
break
|
||||
|
||||
# Validate visual features if no rename_map is active
|
||||
rename_map = cfg.dataset.rename_map if cfg.dataset else {}
|
||||
if not rename_map:
|
||||
expected_visuals = {k for k, v in full_config.input_features.items() if v.type == FeatureType.VISUAL}
|
||||
provided_visuals = {
|
||||
f"observation.{k}" for k, v in robot.observation_features.items() if isinstance(v, tuple)
|
||||
f"observation.images.{k}" for k, v in robot.observation_features.items() if isinstance(v, tuple)
|
||||
}
|
||||
policy_subset = expected_visuals.issubset(provided_visuals)
|
||||
hw_subset = provided_visuals.issubset(expected_visuals)
|
||||
@@ -374,11 +417,35 @@ def build_rollout_context(
|
||||
pretrained_path=cfg.policy.pretrained_path,
|
||||
dataset_stats=dataset_stats,
|
||||
preprocessor_overrides={
|
||||
"device_processor": {"device": cfg.device or getattr(policy_config, "device", "cpu")},
|
||||
"device_processor": {"device": cfg.device},
|
||||
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map if cfg.dataset else {}},
|
||||
},
|
||||
)
|
||||
|
||||
# --- Debug: verify normalizer stats loaded from pretrained ---
|
||||
from lerobot.processor import NormalizerProcessorStep, UnnormalizerProcessorStep
|
||||
|
||||
for step in preprocessor.steps:
|
||||
if isinstance(step, NormalizerProcessorStep):
|
||||
n_stats = sum(len(v) for v in step._tensor_stats.values()) if step._tensor_stats else 0
|
||||
logger.info(
|
||||
"Preprocessor normalizer: %d stat tensors, keys=%s",
|
||||
n_stats,
|
||||
list(step._tensor_stats.keys())[:3],
|
||||
)
|
||||
if n_stats == 0:
|
||||
logger.error("PREPROCESSOR NORMALIZER HAS NO STATS — observations will NOT be normalized!")
|
||||
for step in postprocessor.steps:
|
||||
if isinstance(step, UnnormalizerProcessorStep):
|
||||
n_stats = sum(len(v) for v in step._tensor_stats.values()) if step._tensor_stats else 0
|
||||
logger.info(
|
||||
"Postprocessor unnormalizer: %d stat tensors, keys=%s",
|
||||
n_stats,
|
||||
list(step._tensor_stats.keys())[:3],
|
||||
)
|
||||
if n_stats == 0:
|
||||
logger.error("POSTPROCESSOR UNNORMALIZER HAS NO STATS — actions will NOT be denormalized!")
|
||||
|
||||
# --- 7. Inference strategy (needs policy + pre/post + hardware) --
|
||||
logger.info(
|
||||
"Creating inference engine (type=%s)...",
|
||||
|
||||
@@ -97,10 +97,30 @@ class SyncInferenceEngine(InferenceEngine):
|
||||
observation, self._device, self._task, self._robot_type
|
||||
)
|
||||
observation = self._preprocessor(observation)
|
||||
action = self._policy.select_action(observation)
|
||||
action = self._postprocessor(action)
|
||||
action_raw = self._policy.select_action(observation)
|
||||
action = self._postprocessor(action_raw)
|
||||
action_tensor = action.squeeze(0).cpu()
|
||||
|
||||
if not hasattr(self, "_log_count"):
|
||||
self._log_count = 0
|
||||
if self._log_count < 3:
|
||||
raw_flat = action_raw.squeeze(0).cpu()
|
||||
logger.info(
|
||||
"[Sync tick %d] raw action (first 5): %s | post-processed (first 5): %s",
|
||||
self._log_count,
|
||||
raw_flat[:5].tolist(),
|
||||
action_tensor[:5].tolist(),
|
||||
)
|
||||
obs_state = obs_frame.get("observation.state")
|
||||
if obs_state is not None:
|
||||
logger.info(
|
||||
"[Sync tick %d] obs_frame['observation.state'] (first 5): %s | shape: %s",
|
||||
self._log_count,
|
||||
obs_state[:5].tolist() if hasattr(obs_state, "tolist") else str(obs_state)[:80],
|
||||
obs_state.shape if hasattr(obs_state, "shape") else "?",
|
||||
)
|
||||
self._log_count += 1
|
||||
|
||||
# Reorder to match dataset action ordering so the caller can treat
|
||||
# the returned tensor uniformly across backends.
|
||||
action_dict = make_robot_action(action_tensor, self._dataset_features)
|
||||
|
||||
@@ -48,6 +48,16 @@ class BaseStrategy(RolloutStrategy):
|
||||
|
||||
control_interval = interpolator.get_control_interval(cfg.fps)
|
||||
|
||||
# Flush a few observation reads so CAN bus / sensor state is fresh
|
||||
# before the first inference. Without this, the first observation(s)
|
||||
# can return stale or identical values for all joints, poisoning the
|
||||
# entire first action chunk.
|
||||
_OBS_WARMUP_READS = 5
|
||||
for _ in range(_OBS_WARMUP_READS):
|
||||
robot.get_observation()
|
||||
precise_sleep(1 / cfg.fps)
|
||||
logger.info("Flushed %d observation warmup reads", _OBS_WARMUP_READS)
|
||||
|
||||
start_time = time.perf_counter()
|
||||
engine.resume()
|
||||
logger.info("Base strategy control loop started")
|
||||
@@ -70,6 +80,7 @@ class BaseStrategy(RolloutStrategy):
|
||||
self._log_telemetry(obs_processed, action_dict, ctx.runtime)
|
||||
|
||||
dt = time.perf_counter() - loop_start
|
||||
self._warn_if_slow(dt, control_interval, cfg.fps)
|
||||
if (sleep_t := control_interval - dt) > 0:
|
||||
precise_sleep(sleep_t)
|
||||
|
||||
|
||||
@@ -63,6 +63,10 @@ class RolloutStrategy(abc.ABC):
|
||||
self._engine = ctx.policy.inference
|
||||
logger.info("Starting inference engine...")
|
||||
self._engine.start()
|
||||
# Reset policy and processor state so the first inference starts clean
|
||||
# (matches the old HIL script which called policy.reset() / preprocessor.reset()
|
||||
# at the beginning of each episode).
|
||||
self._engine.reset()
|
||||
self._warmup_flushed = False
|
||||
logger.info("Inference engine started")
|
||||
|
||||
@@ -142,6 +146,20 @@ class RolloutStrategy(abc.ABC):
|
||||
compress_images=cfg.display_compressed_images,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _warn_if_slow(dt: float, control_interval: float, fps: float) -> None:
|
||||
"""Log a warning when the control loop runs slower than target FPS."""
|
||||
if dt > control_interval:
|
||||
actual_fps = 1.0 / dt if dt > 0 else 0
|
||||
logger.warning(
|
||||
"Control loop is running slower (%.1f Hz) than target FPS (%.0f Hz). "
|
||||
"Dataset frames might be dropped and robot control might be unstable. "
|
||||
"Common causes: 1) Camera FPS not keeping up "
|
||||
"2) Policy inference taking too long 3) CPU starvation",
|
||||
actual_fps,
|
||||
fps,
|
||||
)
|
||||
|
||||
@abc.abstractmethod
|
||||
def setup(self, ctx: RolloutContext) -> None:
|
||||
"""Strategy-specific initialisation (keyboard listeners, buffers, etc.)."""
|
||||
@@ -268,5 +286,17 @@ def send_next_action(
|
||||
|
||||
action_dict = {k: interp[i].item() for i, k in enumerate(ordered_keys) if i < len(interp)}
|
||||
processed = ctx.processors.robot_action_processor((action_dict, obs_raw))
|
||||
|
||||
if not hasattr(send_next_action, "_log_count"):
|
||||
send_next_action._log_count = 0
|
||||
if send_next_action._log_count < 3:
|
||||
sample = {k: round(v, 4) for k, v in list(processed.items())[:5]}
|
||||
logger.info(
|
||||
"[send_next_action tick %d] action sent to robot (first 5): %s",
|
||||
send_next_action._log_count,
|
||||
sample,
|
||||
)
|
||||
send_next_action._log_count += 1
|
||||
|
||||
ctx.hardware.robot_wrapper.send_action(processed)
|
||||
return action_dict
|
||||
|
||||
@@ -506,6 +506,7 @@ class DAggerStrategy(RolloutStrategy):
|
||||
episode_start = time.perf_counter()
|
||||
|
||||
dt = time.perf_counter() - loop_start
|
||||
self._warn_if_slow(dt, control_interval, cfg.fps)
|
||||
if (sleep_t := control_interval - dt) > 0:
|
||||
precise_sleep(sleep_t)
|
||||
|
||||
@@ -556,6 +557,7 @@ class DAggerStrategy(RolloutStrategy):
|
||||
engine.resume()
|
||||
|
||||
last_action: dict[str, Any] | None = None
|
||||
start_time = time.perf_counter()
|
||||
record_tick = 0
|
||||
recorded = 0
|
||||
logger.info(
|
||||
@@ -571,6 +573,10 @@ class DAggerStrategy(RolloutStrategy):
|
||||
):
|
||||
loop_start = time.perf_counter()
|
||||
|
||||
if cfg.duration > 0 and (time.perf_counter() - start_time) >= cfg.duration:
|
||||
logger.info("Duration limit reached (%.0fs)", cfg.duration)
|
||||
break
|
||||
|
||||
# Process transitions
|
||||
transition = events.consume_transition()
|
||||
if transition is not None:
|
||||
@@ -641,6 +647,7 @@ class DAggerStrategy(RolloutStrategy):
|
||||
last_action = ctx.processors.robot_action_processor((action_dict, obs))
|
||||
|
||||
dt = time.perf_counter() - loop_start
|
||||
self._warn_if_slow(dt, control_interval, cfg.fps)
|
||||
if (sleep_t := control_interval - dt) > 0:
|
||||
precise_sleep(sleep_t)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from .highlight import HighlightStrategy
|
||||
from .sentry import SentryStrategy
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from lerobot.rollout import RolloutStrategyConfig
|
||||
from ..configs import RolloutStrategyConfig
|
||||
|
||||
|
||||
def create_strategy(config: RolloutStrategyConfig) -> RolloutStrategy:
|
||||
|
||||
@@ -187,6 +187,7 @@ class HighlightStrategy(RolloutStrategy):
|
||||
ring.append(frame)
|
||||
|
||||
dt = time.perf_counter() - loop_start
|
||||
self._warn_if_slow(dt, control_interval, cfg.fps)
|
||||
if (sleep_t := control_interval - dt) > 0:
|
||||
precise_sleep(sleep_t)
|
||||
|
||||
|
||||
@@ -158,6 +158,7 @@ class SentryStrategy(RolloutStrategy):
|
||||
episode_start = time.perf_counter()
|
||||
|
||||
dt = time.perf_counter() - loop_start
|
||||
self._warn_if_slow(dt, control_interval, cfg.fps)
|
||||
if (sleep_t := control_interval - dt) > 0:
|
||||
precise_sleep(sleep_t)
|
||||
|
||||
|
||||
@@ -187,7 +187,7 @@ class TestRTCDenoiseWithRelativeLeftovers:
|
||||
|
||||
|
||||
class TestFullPipelineRelativeRTC:
|
||||
"""End-to-end test of the RTC + relative actions pipeline matching eval_with_real_robot.py flow."""
|
||||
"""End-to-end test of the RTC + relative actions pipeline matching lerobot-rollout flow."""
|
||||
|
||||
def test_preprocessor_caches_state_for_postprocessor(self):
|
||||
"""Preprocessor's relative step should cache state so postprocessor can convert back."""
|
||||
@@ -240,7 +240,7 @@ class TestFullPipelineRelativeRTC:
|
||||
torch.testing.assert_close(recovered, actions, atol=1e-5, rtol=1e-5)
|
||||
|
||||
def test_eval_loop_simulation(self):
|
||||
"""Simulate the eval_with_real_robot.py loop with relative actions.
|
||||
"""Simulate the lerobot-rollout loop with relative actions.
|
||||
|
||||
Iteration 1: No leftovers → model generates relative actions → store for RTC
|
||||
Iteration 2: Use leftovers as RTC guidance → model generates new relative actions
|
||||
@@ -401,12 +401,12 @@ class TestStateRebasingApproximation:
|
||||
|
||||
|
||||
def _detect_relative_actions(preprocessor) -> bool:
|
||||
"""Mirror of the helper in eval_with_real_robot.py for testing without importing it."""
|
||||
"""Mirror of the helper in lerobot-rollout for testing without importing it."""
|
||||
return any(isinstance(step, RelativeActionsProcessorStep) and step.enabled for step in preprocessor.steps)
|
||||
|
||||
|
||||
class TestDetectRelativeActions:
|
||||
"""Test the _detect_relative_actions helper logic used by eval_with_real_robot.py."""
|
||||
"""Test the _detect_relative_actions helper logic used by lerobot-rollout."""
|
||||
|
||||
def test_detects_enabled_relative_step(self):
|
||||
class FakePipeline:
|
||||
|
||||
@@ -80,7 +80,7 @@ def test_dagger_config_defaults():
|
||||
from lerobot.rollout import DAggerStrategyConfig
|
||||
|
||||
cfg = DAggerStrategyConfig()
|
||||
assert cfg.num_episodes == 10
|
||||
assert cfg.num_episodes is None
|
||||
assert cfg.record_autonomous is False
|
||||
assert cfg.input_device == "keyboard"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user