2025-07-03 18:35:14 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
from dataclasses import dataclass, field
|
2025-07-03 16:35:37 +00:00
|
|
|
from typing import Any, Mapping
|
2025-07-03 18:35:14 +02:00
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
import torch
|
|
|
|
|
from torch import Tensor
|
|
|
|
|
|
|
|
|
|
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
|
|
|
|
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionIndex
|
|
|
|
|
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def _convert_stats_to_tensors(stats: dict[str, dict[str, Any]]) -> dict[str, dict[str, Tensor]]:
|
2025-07-03 18:35:14 +02:00
|
|
|
"""Convert numpy arrays and other types to torch tensors."""
|
2025-07-03 16:35:37 +00:00
|
|
|
tensor_stats: dict[str, dict[str, Tensor]] = {}
|
2025-07-03 18:35:14 +02:00
|
|
|
for key, sub in stats.items():
|
|
|
|
|
tensor_stats[key] = {}
|
|
|
|
|
for stat_name, value in sub.items():
|
|
|
|
|
if isinstance(value, np.ndarray):
|
|
|
|
|
tensor_val = torch.from_numpy(value.astype(np.float32))
|
|
|
|
|
elif isinstance(value, torch.Tensor):
|
|
|
|
|
tensor_val = value.to(dtype=torch.float32)
|
|
|
|
|
elif isinstance(value, (int, float, list, tuple)):
|
|
|
|
|
tensor_val = torch.tensor(value, dtype=torch.float32)
|
|
|
|
|
else:
|
|
|
|
|
raise TypeError(f"Unsupported type for stats['{key}']['{stat_name}']: {type(value)}")
|
|
|
|
|
tensor_stats[key][stat_name] = tensor_val
|
|
|
|
|
return tensor_stats
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
@ProcessorStepRegistry.register(name="observation_normalizer")
|
|
|
|
|
class ObservationNormalizer:
|
|
|
|
|
"""Normalize observations using dataset statistics.
|
|
|
|
|
|
|
|
|
|
This processor normalizes selected observation keys using either:
|
|
|
|
|
- Standard normalization: ``(x - mean) / (std + eps)``
|
|
|
|
|
- Min-Max normalization to [-1, 1]: ``2 * (x - min) / (max - min + eps) - 1``
|
|
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
|
----------
|
|
|
|
|
stats : Dict[str, Dict[str, np.ndarray | Tensor]]
|
|
|
|
|
Dataset statistics. Each entry must provide either
|
|
|
|
|
``{"mean", "std"}`` or ``{"min", "max"}``.
|
|
|
|
|
normalize_keys : set[str] | None, default=None
|
|
|
|
|
Observation keys to normalize. ``None`` means all keys
|
|
|
|
|
present in both the observation and stats.
|
|
|
|
|
eps : float, default=1e-8
|
|
|
|
|
Small constant to avoid division by zero.
|
|
|
|
|
"""
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
stats: dict[str, dict[str, Any]]
|
2025-07-03 18:35:14 +02:00
|
|
|
normalize_keys: set[str] | None = None
|
|
|
|
|
eps: float = 1e-8
|
|
|
|
|
|
|
|
|
|
# Cached tensors for performance
|
2025-07-03 16:35:37 +00:00
|
|
|
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
|
2025-07-03 18:35:14 +02:00
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_lerobot_dataset(
|
|
|
|
|
cls,
|
|
|
|
|
dataset: LeRobotDataset,
|
|
|
|
|
*,
|
|
|
|
|
normalize_keys: set[str] | None = None,
|
|
|
|
|
eps: float = 1e-8,
|
|
|
|
|
) -> ObservationNormalizer:
|
|
|
|
|
"""Create from a LeRobotDataset."""
|
|
|
|
|
# Filter stats to only include observation keys
|
|
|
|
|
obs_stats = {k: v for k, v in dataset.meta.stats.items() if k != "action"}
|
|
|
|
|
return cls(stats=obs_stats, normalize_keys=normalize_keys, eps=eps)
|
|
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
self._tensor_stats = _convert_stats_to_tensors(self.stats)
|
|
|
|
|
|
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
|
|
|
observation = transition[TransitionIndex.OBSERVATION]
|
|
|
|
|
|
|
|
|
|
if observation is None:
|
|
|
|
|
return transition
|
|
|
|
|
|
|
|
|
|
# Determine which keys to normalize
|
|
|
|
|
keys_to_norm = (
|
|
|
|
|
self.normalize_keys if self.normalize_keys is not None else set(self._tensor_stats.keys())
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Create a copy to avoid mutating input
|
|
|
|
|
processed_obs = dict(observation)
|
|
|
|
|
|
|
|
|
|
for key in keys_to_norm:
|
|
|
|
|
if key not in processed_obs:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if key not in self._tensor_stats:
|
|
|
|
|
if self.normalize_keys is not None:
|
|
|
|
|
# User explicitly requested this key but stats are missing
|
|
|
|
|
raise KeyError(f"Stats not found for requested key '{key}'")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# Convert to tensor if needed
|
|
|
|
|
orig_val = processed_obs[key]
|
|
|
|
|
if isinstance(orig_val, torch.Tensor):
|
|
|
|
|
tensor = orig_val.to(dtype=torch.float32)
|
|
|
|
|
elif isinstance(orig_val, np.ndarray):
|
|
|
|
|
tensor = torch.from_numpy(orig_val.astype(np.float32))
|
|
|
|
|
else:
|
|
|
|
|
# For lists, tuples, scalars, etc.
|
|
|
|
|
tensor = torch.as_tensor(orig_val, dtype=torch.float32)
|
|
|
|
|
|
|
|
|
|
stats = self._tensor_stats[key]
|
|
|
|
|
# Move stats to same device as data
|
|
|
|
|
stats = {k: v.to(device=tensor.device) for k, v in stats.items()}
|
|
|
|
|
|
|
|
|
|
# Apply normalization
|
|
|
|
|
if "mean" in stats and "std" in stats:
|
|
|
|
|
mean, std = stats["mean"], stats["std"]
|
|
|
|
|
processed_obs[key] = (tensor - mean) / (std + self.eps)
|
|
|
|
|
elif "min" in stats and "max" in stats:
|
|
|
|
|
min_val, max_val = stats["min"], stats["max"]
|
|
|
|
|
# Normalize to [0, 1] then to [-1, 1]
|
|
|
|
|
processed_obs[key] = 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(
|
|
|
|
|
f"Stats for key '{key}' must contain either ('mean', 'std') or ('min', 'max')"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Return new transition with normalized observation
|
|
|
|
|
return (
|
|
|
|
|
processed_obs,
|
|
|
|
|
transition[TransitionIndex.ACTION],
|
|
|
|
|
transition[TransitionIndex.REWARD],
|
|
|
|
|
transition[TransitionIndex.DONE],
|
|
|
|
|
transition[TransitionIndex.TRUNCATED],
|
|
|
|
|
transition[TransitionIndex.INFO],
|
|
|
|
|
transition[TransitionIndex.COMPLEMENTARY_DATA],
|
|
|
|
|
)
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def get_config(self) -> dict[str, Any]:
|
2025-07-03 18:35:14 +02:00
|
|
|
return {
|
|
|
|
|
"normalize_keys": list(self.normalize_keys) if self.normalize_keys is not None else None,
|
|
|
|
|
"eps": self.eps,
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def state_dict(self) -> dict[str, Tensor]:
|
|
|
|
|
flat_state: dict[str, Tensor] = {}
|
2025-07-03 18:35:14 +02:00
|
|
|
for key, sub in self._tensor_stats.items():
|
|
|
|
|
for stat_name, tensor in sub.items():
|
|
|
|
|
flat_state[f"{key}.{stat_name}"] = tensor
|
|
|
|
|
return flat_state
|
|
|
|
|
|
|
|
|
|
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
|
|
|
|
|
self._tensor_stats.clear()
|
|
|
|
|
for flat_key, tensor in state.items():
|
|
|
|
|
key, stat_name = flat_key.split(".", 1)
|
|
|
|
|
if key not in self._tensor_stats:
|
|
|
|
|
self._tensor_stats[key] = {}
|
|
|
|
|
self._tensor_stats[key][stat_name] = tensor
|
|
|
|
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
|
|
"""Nothing to reset for this stateless processor."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
@ProcessorStepRegistry.register(name="action_unnormalizer")
|
|
|
|
|
class ActionUnnormalizer:
|
|
|
|
|
"""Un-normalize actions using dataset statistics.
|
|
|
|
|
|
|
|
|
|
This processor un-normalizes actions using the inverse of normalization:
|
|
|
|
|
- Standard: ``action * std + mean``
|
|
|
|
|
- Min-Max from [-1, 1]: ``(action + 1) / 2 * (max - min) + min``
|
|
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
|
----------
|
|
|
|
|
action_stats : Dict[str, np.ndarray | Tensor]
|
|
|
|
|
Action statistics containing either ``{"mean", "std"}`` or ``{"min", "max"}``.
|
|
|
|
|
eps : float, default=1e-8
|
|
|
|
|
Small constant used during normalization (not used in unnormalization).
|
|
|
|
|
"""
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
action_stats: dict[str, Any]
|
2025-07-03 18:35:14 +02:00
|
|
|
eps: float = 1e-8 # Kept for consistency, not used in unnormalization
|
|
|
|
|
|
|
|
|
|
# Cached tensors for performance
|
2025-07-03 16:35:37 +00:00
|
|
|
_tensor_stats: dict[str, Tensor] = field(default_factory=dict, init=False, repr=False)
|
2025-07-03 18:35:14 +02:00
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_lerobot_dataset(
|
|
|
|
|
cls,
|
|
|
|
|
dataset: LeRobotDataset,
|
|
|
|
|
*,
|
|
|
|
|
eps: float = 1e-8,
|
|
|
|
|
) -> ActionUnnormalizer:
|
|
|
|
|
"""Create from a LeRobotDataset."""
|
|
|
|
|
if "action" not in dataset.meta.stats:
|
|
|
|
|
raise ValueError("Dataset does not contain action statistics")
|
|
|
|
|
return cls(action_stats=dataset.meta.stats["action"], eps=eps)
|
|
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
# Convert action stats to tensors
|
|
|
|
|
tensor_stats = _convert_stats_to_tensors({"action": self.action_stats})
|
|
|
|
|
self._tensor_stats = tensor_stats["action"]
|
|
|
|
|
|
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
|
|
|
action = transition[TransitionIndex.ACTION]
|
|
|
|
|
|
|
|
|
|
if action is None:
|
|
|
|
|
return transition
|
|
|
|
|
|
|
|
|
|
# Convert to tensor if needed
|
|
|
|
|
if isinstance(action, torch.Tensor):
|
|
|
|
|
action = action.to(dtype=torch.float32)
|
|
|
|
|
else:
|
|
|
|
|
action = torch.as_tensor(action, dtype=torch.float32)
|
|
|
|
|
|
|
|
|
|
# Move stats to same device as action
|
|
|
|
|
stats = {k: v.to(device=action.device) for k, v in self._tensor_stats.items()}
|
|
|
|
|
|
|
|
|
|
# Apply unnormalization
|
|
|
|
|
if "mean" in stats and "std" in stats:
|
|
|
|
|
mean, std = stats["mean"], stats["std"]
|
|
|
|
|
unnormalized_action = action * std + mean
|
|
|
|
|
elif "min" in stats and "max" in stats:
|
|
|
|
|
min_val, max_val = stats["min"], stats["max"]
|
|
|
|
|
# Map from [-1, 1] to [0, 1] then to [min, max]
|
|
|
|
|
unnormalized_action = (action + 1) / 2 * (max_val - min_val) + min_val
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError("Action stats must contain either ('mean', 'std') or ('min', 'max')")
|
|
|
|
|
|
|
|
|
|
# Return new transition with unnormalized action
|
|
|
|
|
return (
|
|
|
|
|
transition[TransitionIndex.OBSERVATION],
|
|
|
|
|
unnormalized_action,
|
|
|
|
|
transition[TransitionIndex.REWARD],
|
|
|
|
|
transition[TransitionIndex.DONE],
|
|
|
|
|
transition[TransitionIndex.TRUNCATED],
|
|
|
|
|
transition[TransitionIndex.INFO],
|
|
|
|
|
transition[TransitionIndex.COMPLEMENTARY_DATA],
|
|
|
|
|
)
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def get_config(self) -> dict[str, Any]:
|
2025-07-03 18:35:14 +02:00
|
|
|
return {"eps": self.eps}
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def state_dict(self) -> dict[str, Tensor]:
|
2025-07-03 18:35:14 +02:00
|
|
|
return dict(self._tensor_stats.items())
|
|
|
|
|
|
|
|
|
|
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
|
|
|
|
|
self._tensor_stats = dict(state)
|
|
|
|
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
|
|
"""Nothing to reset for this stateless processor."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
@ProcessorStepRegistry.register(name="normalization_processor")
|
|
|
|
|
class NormalizationProcessor:
|
|
|
|
|
"""Combined processor that normalizes observations and/or un-normalizes actions.
|
|
|
|
|
|
|
|
|
|
This processor combines the functionality of ObservationNormalizer and
|
|
|
|
|
ActionUnnormalizer for convenience when both operations are needed.
|
|
|
|
|
|
|
|
|
|
Parameters
|
|
|
|
|
----------
|
|
|
|
|
stats : Dict[str, Dict[str, np.ndarray | Tensor]]
|
|
|
|
|
Dataset statistics as returned by ``LeRobotDataset.meta.stats``.
|
|
|
|
|
normalize_keys : set[str] | None, default=None
|
|
|
|
|
Observation keys to normalize. ``None`` means all keys
|
|
|
|
|
present in both the observation and stats.
|
|
|
|
|
unnormalize_action : bool, default=True
|
|
|
|
|
Whether to un-normalize actions.
|
|
|
|
|
eps : float, default=1e-8
|
|
|
|
|
Small constant to avoid division by zero.
|
|
|
|
|
"""
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
stats: dict[str, dict[str, Any]]
|
2025-07-03 18:35:14 +02:00
|
|
|
normalize_keys: set[str] | None = None
|
|
|
|
|
unnormalize_action: bool = True
|
|
|
|
|
eps: float = 1e-8
|
|
|
|
|
|
|
|
|
|
# Cached tensors for performance
|
2025-07-03 16:35:37 +00:00
|
|
|
_tensor_stats: dict[str, dict[str, Tensor]] = field(default_factory=dict, init=False, repr=False)
|
2025-07-03 18:35:14 +02:00
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_lerobot_dataset(
|
|
|
|
|
cls,
|
|
|
|
|
dataset: LeRobotDataset,
|
|
|
|
|
*,
|
|
|
|
|
normalize_keys: set[str] | None = None,
|
|
|
|
|
unnormalize_action: bool = True,
|
|
|
|
|
eps: float = 1e-8,
|
|
|
|
|
) -> NormalizationProcessor:
|
|
|
|
|
"""Create from a LeRobotDataset."""
|
|
|
|
|
return cls(
|
|
|
|
|
stats=dataset.meta.stats,
|
|
|
|
|
normalize_keys=normalize_keys,
|
|
|
|
|
unnormalize_action=unnormalize_action,
|
|
|
|
|
eps=eps,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
self._tensor_stats = _convert_stats_to_tensors(self.stats)
|
|
|
|
|
|
|
|
|
|
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
|
|
|
|
observation = transition[TransitionIndex.OBSERVATION]
|
|
|
|
|
action = transition[TransitionIndex.ACTION]
|
|
|
|
|
|
|
|
|
|
# Normalize observations
|
|
|
|
|
if observation is not None:
|
|
|
|
|
processed_obs = dict(observation)
|
|
|
|
|
keys_to_norm = (
|
|
|
|
|
self.normalize_keys
|
|
|
|
|
if self.normalize_keys is not None
|
|
|
|
|
else {k for k in self._tensor_stats if k != "action"}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for key in keys_to_norm:
|
|
|
|
|
if key not in processed_obs or key not in self._tensor_stats:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
orig_val = processed_obs[key]
|
|
|
|
|
if isinstance(orig_val, torch.Tensor):
|
|
|
|
|
tensor = orig_val.to(dtype=torch.float32)
|
|
|
|
|
elif isinstance(orig_val, np.ndarray):
|
|
|
|
|
tensor = torch.from_numpy(orig_val.astype(np.float32))
|
|
|
|
|
else:
|
|
|
|
|
tensor = torch.as_tensor(orig_val, dtype=torch.float32)
|
|
|
|
|
|
|
|
|
|
stats = self._tensor_stats[key]
|
|
|
|
|
stats = {k: v.to(device=tensor.device) for k, v in stats.items()}
|
|
|
|
|
|
|
|
|
|
if "mean" in stats and "std" in stats:
|
|
|
|
|
mean, std = stats["mean"], stats["std"]
|
|
|
|
|
processed_obs[key] = (tensor - mean) / (std + self.eps)
|
|
|
|
|
elif "min" in stats and "max" in stats:
|
|
|
|
|
min_val, max_val = stats["min"], stats["max"]
|
|
|
|
|
processed_obs[key] = 2 * (tensor - min_val) / (max_val - min_val + self.eps) - 1
|
|
|
|
|
|
|
|
|
|
observation = processed_obs
|
|
|
|
|
|
|
|
|
|
# Un-normalize action
|
|
|
|
|
if self.unnormalize_action and action is not None and "action" in self._tensor_stats:
|
|
|
|
|
if isinstance(action, torch.Tensor):
|
|
|
|
|
action = action.to(dtype=torch.float32)
|
|
|
|
|
else:
|
|
|
|
|
action = torch.as_tensor(action, dtype=torch.float32)
|
|
|
|
|
|
|
|
|
|
stats = {k: v.to(device=action.device) for k, v in self._tensor_stats["action"].items()}
|
|
|
|
|
|
|
|
|
|
if "mean" in stats and "std" in stats:
|
|
|
|
|
mean, std = stats["mean"], stats["std"]
|
|
|
|
|
action = action * std + mean
|
|
|
|
|
elif "min" in stats and "max" in stats:
|
|
|
|
|
min_val, max_val = stats["min"], stats["max"]
|
|
|
|
|
action = (action + 1) / 2 * (max_val - min_val) + min_val
|
|
|
|
|
|
|
|
|
|
# Return new transition
|
|
|
|
|
return (
|
|
|
|
|
observation,
|
|
|
|
|
action,
|
|
|
|
|
transition[TransitionIndex.REWARD],
|
|
|
|
|
transition[TransitionIndex.DONE],
|
|
|
|
|
transition[TransitionIndex.TRUNCATED],
|
|
|
|
|
transition[TransitionIndex.INFO],
|
|
|
|
|
transition[TransitionIndex.COMPLEMENTARY_DATA],
|
|
|
|
|
)
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def get_config(self) -> dict[str, Any]:
|
2025-07-03 18:35:14 +02:00
|
|
|
return {
|
|
|
|
|
"normalize_keys": list(self.normalize_keys) if self.normalize_keys is not None else None,
|
|
|
|
|
"unnormalize_action": self.unnormalize_action,
|
|
|
|
|
"eps": self.eps,
|
|
|
|
|
}
|
|
|
|
|
|
2025-07-03 16:35:37 +00:00
|
|
|
def state_dict(self) -> dict[str, Tensor]:
|
|
|
|
|
flat_state: dict[str, Tensor] = {}
|
2025-07-03 18:35:14 +02:00
|
|
|
for key, sub in self._tensor_stats.items():
|
|
|
|
|
for stat_name, tensor in sub.items():
|
|
|
|
|
flat_state[f"{key}.{stat_name}"] = tensor
|
|
|
|
|
return flat_state
|
|
|
|
|
|
|
|
|
|
def load_state_dict(self, state: Mapping[str, Tensor]) -> None:
|
|
|
|
|
self._tensor_stats.clear()
|
|
|
|
|
for flat_key, tensor in state.items():
|
|
|
|
|
key, stat_name = flat_key.split(".", 1)
|
|
|
|
|
if key not in self._tensor_stats:
|
|
|
|
|
self._tensor_stats[key] = {}
|
|
|
|
|
self._tensor_stats[key][stat_name] = tensor
|
|
|
|
|
|
|
|
|
|
def reset(self) -> None:
|
|
|
|
|
"""Nothing to reset for this stateless processor."""
|
|
|
|
|
pass
|