Compare commits

...

4 Commits

Author SHA1 Message Date
Steven Palma
475b8da7d6 device + task + warn fix 2026-04-21 19:06:53 +02:00
Steven Palma
c16ac80063 processor debug 2026-04-20 20:00:50 +02:00
Steven Palma
2f690fe4f1 logs 2026-04-20 19:25:59 +02:00
Steven Palma
52703ecf95 debug fixes 2026-04-20 19:12:07 +02:00
8 changed files with 162 additions and 10 deletions

View File

@@ -225,10 +225,10 @@ class RolloutConfig:
if needs_dataset and (self.dataset is None or not self.dataset.repo_id):
raise ValueError(f"{self.strategy.type} strategy requires --dataset.repo_id to be set")
if isinstance(self.strategy, BaseStrategyConfig) and self.dataset is not None:
raise ValueError(
"Base strategy does not record data. Use sentry, highlight, or dagger for recording."
)
# if isinstance(self.strategy, BaseStrategyConfig) and self.dataset is not None:
# raise ValueError(
# "Base strategy does not record data. Use sentry, highlight, or dagger for recording."
# )
# Sentry MUST use streaming encoding to avoid disk I/O blocking the control loop
if (
@@ -285,6 +285,26 @@ class RolloutConfig:
if self.policy is None:
raise ValueError("--policy.path is required for rollout")
# --- Task resolution ---
# When --dataset.rename_map (or any --dataset.* flag) is passed, draccus
# creates a DatasetRecordConfig with single_task="". If the user set
# the task via the top-level --task flag, propagate it so that all
# downstream consumers (inference engine, dataset frame builders) see it.
if self.dataset is not None and not self.dataset.single_task and self.task:
self.dataset.single_task = self.task
elif self.dataset is not None and self.dataset.single_task and not self.task:
self.task = self.dataset.single_task
# --- Device resolution ---
# Resolve device from the policy config when not explicitly set so all
# components (policy.to, preprocessor, inference engine) use the same
# device string instead of inconsistent fallbacks.
if self.device is None and self.policy is not None:
resolved = getattr(self.policy, "device", None)
if resolved:
self.device = resolved
logger.info("Resolved device from policy config: %s", self.device)
@classmethod
def __get_path_fields__(cls) -> list[str]:
return ["policy"]

View File

@@ -272,11 +272,17 @@ def build_rollout_context(
# )
# --- 4. Features + action-key reconciliation ---------------------
# Only `.pos` joint features are used for policy inference — velocity and
# torque channels are observation-only and must be excluded from the state
# and action tensors that the policy sees. This matches the filtering
# applied by the old ``hil_data_collection`` script.
all_obs_features = robot.observation_features
observation_features_hw = {
k: v for k, v in all_obs_features.items() if v is float or isinstance(v, tuple)
k: v
for k, v in all_obs_features.items()
if isinstance(v, tuple) or (v is float and k.endswith(".pos"))
}
action_features_hw = robot.action_features
action_features_hw = {k: v for k, v in robot.action_features.items() if k.endswith(".pos")}
# The action side is always needed: sync inference reads action names from
# ``dataset_features[ACTION]`` to map policy tensors back to robot actions.
@@ -293,13 +299,50 @@ def build_rollout_context(
)
dataset_features = combine_feature_dicts(action_dataset_features, observation_dataset_features)
hw_features = hw_to_dataset_features(observation_features_hw, "observation")
raw_action_keys = list(robot.action_features.keys())
raw_action_keys = list(action_features_hw.keys())
policy_action_names = getattr(policy_config, "action_feature_names", None)
ordered_action_keys = _resolve_action_key_order(
list(policy_action_names) if policy_action_names else None,
raw_action_keys,
)
# --- Diagnostic logging ---
_act_ft = dataset_features.get("action", {})
_obs_ft = dataset_features.get("observation.state", {})
logger.info(
"Feature reconciliation: action_dim=%d, obs_state_dim=%d, ordered_action_keys=%d",
_act_ft.get("shape", (0,))[0],
_obs_ft.get("shape", (0,))[0],
len(ordered_action_keys),
)
logger.info(" action names : %s", _act_ft.get("names", []))
logger.info(" obs state names: %s", _obs_ft.get("names", []))
logger.info(" ordered keys : %s", ordered_action_keys)
logger.info(
" policy.action_feature_names: %s",
list(policy_action_names) if policy_action_names else "None (using raw_action_keys)",
)
if full_config.input_features:
logger.info(" policy input_features: %s", list(full_config.input_features.keys()))
else:
logger.warning(" policy input_features is EMPTY — policy may not process images!")
if full_config.output_features:
for k, v in full_config.output_features.items():
logger.info(" policy output_features[%s]: shape=%s", k, v.shape)
# Validate action dimension consistency
if full_config.output_features:
for ft in full_config.output_features.values():
policy_action_dim = ft.shape[0]
if len(ordered_action_keys) != policy_action_dim:
logger.error(
"ACTION DIM MISMATCH: policy expects %d dims, hardware produces %d keys. "
"First 5 keys: %s",
policy_action_dim,
len(ordered_action_keys),
ordered_action_keys[:5],
)
break
# Validate visual features if no rename_map is active
rename_map = cfg.dataset.rename_map if cfg.dataset else {}
if not rename_map:
@@ -374,11 +417,35 @@ def build_rollout_context(
pretrained_path=cfg.policy.pretrained_path,
dataset_stats=dataset_stats,
preprocessor_overrides={
"device_processor": {"device": cfg.device or getattr(policy_config, "device", "cpu")},
"device_processor": {"device": cfg.device},
"rename_observations_processor": {"rename_map": cfg.dataset.rename_map if cfg.dataset else {}},
},
)
# --- Debug: verify normalizer stats loaded from pretrained ---
from lerobot.processor import NormalizerProcessorStep, UnnormalizerProcessorStep
for step in preprocessor.steps:
if isinstance(step, NormalizerProcessorStep):
n_stats = sum(len(v) for v in step._tensor_stats.values()) if step._tensor_stats else 0
logger.info(
"Preprocessor normalizer: %d stat tensors, keys=%s",
n_stats,
list(step._tensor_stats.keys())[:3],
)
if n_stats == 0:
logger.error("PREPROCESSOR NORMALIZER HAS NO STATS — observations will NOT be normalized!")
for step in postprocessor.steps:
if isinstance(step, UnnormalizerProcessorStep):
n_stats = sum(len(v) for v in step._tensor_stats.values()) if step._tensor_stats else 0
logger.info(
"Postprocessor unnormalizer: %d stat tensors, keys=%s",
n_stats,
list(step._tensor_stats.keys())[:3],
)
if n_stats == 0:
logger.error("POSTPROCESSOR UNNORMALIZER HAS NO STATS — actions will NOT be denormalized!")
# --- 7. Inference strategy (needs policy + pre/post + hardware) --
logger.info(
"Creating inference engine (type=%s)...",

View File

@@ -97,10 +97,30 @@ class SyncInferenceEngine(InferenceEngine):
observation, self._device, self._task, self._robot_type
)
observation = self._preprocessor(observation)
action = self._policy.select_action(observation)
action = self._postprocessor(action)
action_raw = self._policy.select_action(observation)
action = self._postprocessor(action_raw)
action_tensor = action.squeeze(0).cpu()
if not hasattr(self, "_log_count"):
self._log_count = 0
if self._log_count < 3:
raw_flat = action_raw.squeeze(0).cpu()
logger.info(
"[Sync tick %d] raw action (first 5): %s | post-processed (first 5): %s",
self._log_count,
raw_flat[:5].tolist(),
action_tensor[:5].tolist(),
)
obs_state = obs_frame.get("observation.state")
if obs_state is not None:
logger.info(
"[Sync tick %d] obs_frame['observation.state'] (first 5): %s | shape: %s",
self._log_count,
obs_state[:5].tolist() if hasattr(obs_state, "tolist") else str(obs_state)[:80],
obs_state.shape if hasattr(obs_state, "shape") else "?",
)
self._log_count += 1
# Reorder to match dataset action ordering so the caller can treat
# the returned tensor uniformly across backends.
action_dict = make_robot_action(action_tensor, self._dataset_features)

View File

@@ -48,6 +48,16 @@ class BaseStrategy(RolloutStrategy):
control_interval = interpolator.get_control_interval(cfg.fps)
# Flush a few observation reads so CAN bus / sensor state is fresh
# before the first inference. Without this, the first observation(s)
# can return stale or identical values for all joints, poisoning the
# entire first action chunk.
_OBS_WARMUP_READS = 5
for _ in range(_OBS_WARMUP_READS):
robot.get_observation()
precise_sleep(1 / cfg.fps)
logger.info("Flushed %d observation warmup reads", _OBS_WARMUP_READS)
start_time = time.perf_counter()
engine.resume()
logger.info("Base strategy control loop started")
@@ -70,6 +80,7 @@ class BaseStrategy(RolloutStrategy):
self._log_telemetry(obs_processed, action_dict, ctx.runtime)
dt = time.perf_counter() - loop_start
self._warn_if_slow(dt, control_interval, cfg.fps)
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)

View File

@@ -63,6 +63,10 @@ class RolloutStrategy(abc.ABC):
self._engine = ctx.policy.inference
logger.info("Starting inference engine...")
self._engine.start()
# Reset policy and processor state so the first inference starts clean
# (matches the old HIL script which called policy.reset() / preprocessor.reset()
# at the beginning of each episode).
self._engine.reset()
self._warmup_flushed = False
logger.info("Inference engine started")
@@ -142,6 +146,20 @@ class RolloutStrategy(abc.ABC):
compress_images=cfg.display_compressed_images,
)
@staticmethod
def _warn_if_slow(dt: float, control_interval: float, fps: float) -> None:
"""Log a warning when the control loop runs slower than target FPS."""
if dt > control_interval:
actual_fps = 1.0 / dt if dt > 0 else 0
logger.warning(
"Control loop is running slower (%.1f Hz) than target FPS (%.0f Hz). "
"Dataset frames might be dropped and robot control might be unstable. "
"Common causes: 1) Camera FPS not keeping up "
"2) Policy inference taking too long 3) CPU starvation",
actual_fps,
fps,
)
@abc.abstractmethod
def setup(self, ctx: RolloutContext) -> None:
"""Strategy-specific initialisation (keyboard listeners, buffers, etc.)."""
@@ -268,5 +286,17 @@ def send_next_action(
action_dict = {k: interp[i].item() for i, k in enumerate(ordered_keys) if i < len(interp)}
processed = ctx.processors.robot_action_processor((action_dict, obs_raw))
if not hasattr(send_next_action, "_log_count"):
send_next_action._log_count = 0
if send_next_action._log_count < 3:
sample = {k: round(v, 4) for k, v in list(processed.items())[:5]}
logger.info(
"[send_next_action tick %d] action sent to robot (first 5): %s",
send_next_action._log_count,
sample,
)
send_next_action._log_count += 1
ctx.hardware.robot_wrapper.send_action(processed)
return action_dict

View File

@@ -506,6 +506,7 @@ class DAggerStrategy(RolloutStrategy):
episode_start = time.perf_counter()
dt = time.perf_counter() - loop_start
self._warn_if_slow(dt, control_interval, cfg.fps)
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)
@@ -646,6 +647,7 @@ class DAggerStrategy(RolloutStrategy):
last_action = ctx.processors.robot_action_processor((action_dict, obs))
dt = time.perf_counter() - loop_start
self._warn_if_slow(dt, control_interval, cfg.fps)
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)

View File

@@ -187,6 +187,7 @@ class HighlightStrategy(RolloutStrategy):
ring.append(frame)
dt = time.perf_counter() - loop_start
self._warn_if_slow(dt, control_interval, cfg.fps)
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)

View File

@@ -158,6 +158,7 @@ class SentryStrategy(RolloutStrategy):
episode_start = time.perf_counter()
dt = time.perf_counter() - loop_start
self._warn_if_slow(dt, control_interval, cfg.fps)
if (sleep_t := control_interval - dt) > 0:
precise_sleep(sleep_t)