diff --git a/examples/phone_to_so100/evaluate.py b/examples/phone_to_so100/evaluate.py index 0620ef8b0..a3ed4722b 100644 --- a/examples/phone_to_so100/evaluate.py +++ b/examples/phone_to_so100/evaluate.py @@ -21,7 +21,7 @@ from lerobot.datasets.utils import combine_feature_dicts from lerobot.model.kinematics import RobotKinematics from lerobot.policies.act.modeling_act import ACTPolicy from lerobot.policies.factory import make_pre_post_processors -from lerobot.processor import DataProcessorPipeline +from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import ( observation_to_transition, transition_to_robot_action, @@ -65,7 +65,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = DataProcessorPipeline( +robot_ee_to_joints_processor = RobotProcessorPipeline( steps=[ AddRobotObservationAsComplimentaryData(robot=robot), InverseKinematicsEEToJoints( @@ -79,7 +79,7 @@ robot_ee_to_joints_processor = DataProcessorPipeline( ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose_processor = DataProcessorPipeline( +robot_joints_to_ee_pose_processor = RobotProcessorPipeline( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], diff --git a/examples/phone_to_so100/record.py b/examples/phone_to_so100/record.py index 6a17e7a41..fef612692 100644 --- a/examples/phone_to_so100/record.py +++ b/examples/phone_to_so100/record.py @@ -20,7 +20,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.pipeline_features import aggregate_pipeline_dataset_features from lerobot.datasets.utils import combine_feature_dicts from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import DataProcessorPipeline +from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import ( action_to_transition, observation_to_transition, @@ -73,7 +73,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert phone action to ee pose action -phone_to_robot_ee_pose_processor = DataProcessorPipeline( +phone_to_robot_ee_pose_processor = RobotProcessorPipeline( steps=[ MapPhoneActionToRobotAction(platform=teleop_config.phone_os), AddRobotObservationAsComplimentaryData(robot=robot), @@ -93,7 +93,7 @@ phone_to_robot_ee_pose_processor = DataProcessorPipeline( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = DataProcessorPipeline( +robot_ee_to_joints_processor = RobotProcessorPipeline( steps=[ InverseKinematicsEEToJoints( kinematics=kinematics_solver, @@ -110,7 +110,7 @@ robot_ee_to_joints_processor = DataProcessorPipeline( ) # Build pipeline to convert joint observation to ee pose observation -robot_joints_to_ee_pose = DataProcessorPipeline( +robot_joints_to_ee_pose = RobotProcessorPipeline( steps=[ ForwardKinematicsJointsToEE(kinematics=kinematics_solver, motor_names=list(robot.bus.motors.keys())) ], diff --git a/examples/phone_to_so100/replay.py b/examples/phone_to_so100/replay.py index 2fa5c51c4..cb56005cb 100644 --- a/examples/phone_to_so100/replay.py +++ b/examples/phone_to_so100/replay.py @@ -19,7 +19,7 @@ import time from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import DataProcessorPipeline +from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import action_to_transition, transition_to_robot_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( @@ -50,7 +50,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert ee pose action to joint action -robot_ee_to_joints_processor = DataProcessorPipeline( +robot_ee_to_joints_processor = RobotProcessorPipeline( steps=[ AddRobotObservationAsComplimentaryData(robot=robot), InverseKinematicsEEToJoints( diff --git a/examples/phone_to_so100/teleoperate.py b/examples/phone_to_so100/teleoperate.py index 193c88986..6a4f76e74 100644 --- a/examples/phone_to_so100/teleoperate.py +++ b/examples/phone_to_so100/teleoperate.py @@ -16,7 +16,7 @@ import time from lerobot.model.kinematics import RobotKinematics -from lerobot.processor import DataProcessorPipeline +from lerobot.processor import RobotProcessorPipeline from lerobot.processor.converters import action_to_transition, transition_to_robot_action from lerobot.robots.so100_follower.config_so100_follower import SO100FollowerConfig from lerobot.robots.so100_follower.robot_kinematic_processor import ( @@ -49,7 +49,7 @@ kinematics_solver = RobotKinematics( ) # Build pipeline to convert phone action to ee pose action to joint action -phone_to_robot_joints_processor = DataProcessorPipeline( +phone_to_robot_joints_processor = RobotProcessorPipeline( steps=[ MapPhoneActionToRobotAction(platform=teleop_config.phone_os), AddRobotObservationAsComplimentaryData(robot=robot), diff --git a/src/lerobot/policies/act/processor_act.py b/src/lerobot/policies/act/processor_act.py index f33aeac96..3bf208184 100644 --- a/src/lerobot/policies/act/processor_act.py +++ b/src/lerobot/policies/act/processor_act.py @@ -19,9 +19,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.act.configuration_act import ACTConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -33,7 +33,7 @@ def make_act_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_act_pre_post_processors( ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/diffusion/processor_diffusion.py b/src/lerobot/policies/diffusion/processor_diffusion.py index ef47ec8b3..19ea30b79 100644 --- a/src/lerobot/policies/diffusion/processor_diffusion.py +++ b/src/lerobot/policies/diffusion/processor_diffusion.py @@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -34,7 +34,7 @@ def make_diffusion_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_diffusion_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/factory.py b/src/lerobot/policies/factory.py index 15c2a3521..125b526ee 100644 --- a/src/lerobot/policies/factory.py +++ b/src/lerobot/policies/factory.py @@ -38,7 +38,7 @@ from lerobot.policies.sac.reward_model.configuration_classifier import RewardCla from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig -from lerobot.processor import DataProcessorPipeline, ProcessorKwargs +from lerobot.processor import PolicyProcessorPipeline, ProcessorKwargs def get_policy_class(name: str) -> type[PreTrainedPolicy]: @@ -122,7 +122,7 @@ def make_pre_post_processors( policy_cfg: PreTrainedConfig, pretrained_path: str | None = None, **kwargs: Unpack[ProcessorConfigKwargs], -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: """Make a processor instance for a given policy type. This function creates the appropriate processor configuration based on the policy type. @@ -146,14 +146,14 @@ def make_pre_post_processors( postprocessor_kwargs = kwargs.get("postprocessor_kwargs", {}) return ( - DataProcessorPipeline.from_pretrained( + PolicyProcessorPipeline.from_pretrained( pretrained_model_name_or_path=pretrained_path, config_filename=kwargs.get("preprocessor_config_filename", "robot_preprocessor.json"), overrides=kwargs.get("preprocessor_overrides", {}), to_transition=preprocessor_kwargs.get("to_transition"), to_output=preprocessor_kwargs.get("to_output"), ), - DataProcessorPipeline.from_pretrained( + PolicyProcessorPipeline.from_pretrained( pretrained_model_name_or_path=pretrained_path, config_filename=kwargs.get("postprocessor_config_filename", "robot_postprocessor.json"), overrides=kwargs.get("postprocessor_overrides", {}), diff --git a/src/lerobot/policies/pi0/processor_pi0.py b/src/lerobot/policies/pi0/processor_pi0.py index ddffef28f..d2a675c74 100644 --- a/src/lerobot/policies/pi0/processor_pi0.py +++ b/src/lerobot/policies/pi0/processor_pi0.py @@ -23,9 +23,9 @@ from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( AddBatchDimensionProcessorStep, ComplementaryDataProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, ProcessorStep, ProcessorStepRegistry, @@ -72,7 +72,7 @@ def make_pi0_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -105,12 +105,12 @@ def make_pi0_pre_post_processors( ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/pi0fast/processor_pi0fast.py b/src/lerobot/policies/pi0fast/processor_pi0fast.py index fab103280..cc5eb4fdc 100644 --- a/src/lerobot/policies/pi0fast/processor_pi0fast.py +++ b/src/lerobot/policies/pi0fast/processor_pi0fast.py @@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.pi0.configuration_pi0 import PI0Config from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -34,7 +34,7 @@ def make_pi0fast_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_pi0fast_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/sac/processor_sac.py b/src/lerobot/policies/sac/processor_sac.py index 7c90cc08f..14dba9151 100644 --- a/src/lerobot/policies/sac/processor_sac.py +++ b/src/lerobot/policies/sac/processor_sac.py @@ -21,9 +21,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.sac.configuration_sac import SACConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -35,7 +35,7 @@ def make_sac_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -58,12 +58,12 @@ def make_sac_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/sac/reward_model/processor_classifier.py b/src/lerobot/policies/sac/reward_model/processor_classifier.py index c5ffae621..70195d69d 100644 --- a/src/lerobot/policies/sac/reward_model/processor_classifier.py +++ b/src/lerobot/policies/sac/reward_model/processor_classifier.py @@ -17,10 +17,10 @@ import torch from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig from lerobot.processor import ( - DataProcessorPipeline, DeviceProcessorStep, IdentityProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, ) @@ -30,7 +30,7 @@ def make_classifier_processor( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -48,12 +48,12 @@ def make_classifier_processor( output_steps = [DeviceProcessorStep(device="cpu"), IdentityProcessorStep()] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name="classifier_preprocessor", **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name="classifier_postprocessor", **postprocessor_kwargs, diff --git a/src/lerobot/policies/smolvla/processor_smolvla.py b/src/lerobot/policies/smolvla/processor_smolvla.py index 3d2be30b2..9a0fb067f 100644 --- a/src/lerobot/policies/smolvla/processor_smolvla.py +++ b/src/lerobot/policies/smolvla/processor_smolvla.py @@ -22,9 +22,9 @@ from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, ComplementaryDataProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, ProcessorStepRegistry, RenameProcessorStep, @@ -38,7 +38,7 @@ def make_smolvla_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -68,12 +68,12 @@ def make_smolvla_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/tdmpc/processor_tdmpc.py b/src/lerobot/policies/tdmpc/processor_tdmpc.py index fecddef0e..3085db9cf 100644 --- a/src/lerobot/policies/tdmpc/processor_tdmpc.py +++ b/src/lerobot/policies/tdmpc/processor_tdmpc.py @@ -20,9 +20,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -34,7 +34,7 @@ def make_tdmpc_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -57,12 +57,12 @@ def make_tdmpc_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/policies/vqbet/processor_vqbet.py b/src/lerobot/policies/vqbet/processor_vqbet.py index 37be43150..ea60cff54 100644 --- a/src/lerobot/policies/vqbet/processor_vqbet.py +++ b/src/lerobot/policies/vqbet/processor_vqbet.py @@ -21,9 +21,9 @@ from lerobot.constants import POSTPROCESSOR_DEFAULT_NAME, PREPROCESSOR_DEFAULT_N from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig from lerobot.processor import ( AddBatchDimensionProcessorStep, - DataProcessorPipeline, DeviceProcessorStep, NormalizerProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, RenameProcessorStep, UnnormalizerProcessorStep, @@ -35,7 +35,7 @@ def make_vqbet_pre_post_processors( dataset_stats: dict[str, dict[str, torch.Tensor]] | None = None, preprocessor_kwargs: ProcessorKwargs | None = None, postprocessor_kwargs: ProcessorKwargs | None = None, -) -> tuple[DataProcessorPipeline, DataProcessorPipeline]: +) -> tuple[PolicyProcessorPipeline, PolicyProcessorPipeline]: if preprocessor_kwargs is None: preprocessor_kwargs = {} if postprocessor_kwargs is None: @@ -58,12 +58,12 @@ def make_vqbet_pre_post_processors( ), ] return ( - DataProcessorPipeline( + PolicyProcessorPipeline( steps=input_steps, name=PREPROCESSOR_DEFAULT_NAME, **preprocessor_kwargs, ), - DataProcessorPipeline( + PolicyProcessorPipeline( steps=output_steps, name=POSTPROCESSOR_DEFAULT_NAME, **postprocessor_kwargs, diff --git a/src/lerobot/processor/__init__.py b/src/lerobot/processor/__init__.py index 4832d8a4e..89649fd13 100644 --- a/src/lerobot/processor/__init__.py +++ b/src/lerobot/processor/__init__.py @@ -46,10 +46,12 @@ from .pipeline import ( IdentityProcessorStep, InfoProcessorStep, ObservationProcessorStep, + PolicyProcessorPipeline, ProcessorKwargs, ProcessorStep, ProcessorStepRegistry, RewardProcessorStep, + RobotProcessorPipeline, TruncatedProcessorStep, ) from .rename_processor import RenameProcessorStep @@ -79,6 +81,7 @@ __all__ = [ "NormalizerProcessorStep", "Numpy2TorchActionProcessorStep", "ObservationProcessorStep", + "PolicyProcessorPipeline", "ProcessorKwargs", "ProcessorStep", "ProcessorStepRegistry", @@ -88,6 +91,7 @@ __all__ = [ "DataProcessorPipeline", "TimeLimitProcessorStep", "AddBatchDimensionProcessorStep", + "RobotProcessorPipeline", "TokenizerProcessorStep", "Torch2NumpyActionProcessorStep", "transition_to_batch", diff --git a/src/lerobot/processor/migrate_policy_normalization.py b/src/lerobot/processor/migrate_policy_normalization.py index 84eac12af..9690f6b98 100644 --- a/src/lerobot/processor/migrate_policy_normalization.py +++ b/src/lerobot/processor/migrate_policy_normalization.py @@ -50,7 +50,7 @@ from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature from .batch_processor import AddBatchDimensionProcessorStep from .device_processor import DeviceProcessorStep from .normalize_processor import NormalizerProcessorStep, UnnormalizerProcessorStep -from .pipeline import DataProcessorPipeline +from .pipeline import PolicyProcessorPipeline from .rename_processor import RenameProcessorStep # Policy type to class mapping @@ -422,14 +422,14 @@ def main(): AddBatchDimensionProcessorStep(), DeviceProcessorStep(device=policy_config.device), ] - preprocessor = DataProcessorPipeline(steps=preprocessor_steps, name="robot_preprocessor") + preprocessor = PolicyProcessorPipeline(steps=preprocessor_steps, name="robot_preprocessor") # Create postprocessor with unnormalizer for outputs only postprocessor_steps = [ DeviceProcessorStep(device="cpu"), UnnormalizerProcessorStep(features=output_features, norm_map=norm_map, stats=stats), ] - postprocessor = DataProcessorPipeline(steps=postprocessor_steps, name="robot_postprocessor") + postprocessor = PolicyProcessorPipeline(steps=postprocessor_steps, name="robot_postprocessor") # Determine hub repo ID if pushing to hub if args.push_to_hub: diff --git a/src/lerobot/processor/normalize_processor.py b/src/lerobot/processor/normalize_processor.py index 205efe83a..0e97c2568 100644 --- a/src/lerobot/processor/normalize_processor.py +++ b/src/lerobot/processor/normalize_processor.py @@ -12,7 +12,7 @@ from lerobot.datasets.lerobot_dataset import LeRobotDataset from .converters import to_tensor from .core import EnvTransition, TransitionKey -from .pipeline import DataProcessorPipeline, ProcessorStep, ProcessorStepRegistry +from .pipeline import PolicyProcessorPipeline, ProcessorStep, ProcessorStepRegistry @dataclass @@ -253,17 +253,17 @@ class UnnormalizerProcessorStep(_NormalizationMixin, ProcessorStep): def hotswap_stats( - robot_processor: DataProcessorPipeline, stats: dict[str, dict[str, Any]] -) -> DataProcessorPipeline: + policy_processor: PolicyProcessorPipeline, stats: dict[str, dict[str, Any]] +) -> PolicyProcessorPipeline: """ - Replaces normalization statistics in a RobotProcessor pipeline. + Replaces normalization statistics in a PolicyProcessor pipeline. - This function creates a deep copy of the provided `RobotProcessor` and updates the + This function creates a deep copy of the provided `PolicyProcessorPipeline` and updates the statistics of any `NormalizerProcessorStep` or `UnnormalizerProcessorStep` steps within it. It's useful for adapting a trained policy to a new environment or dataset with different data distributions. """ - rp = deepcopy(robot_processor) + rp = deepcopy(policy_processor) for step in rp.steps: if isinstance(step, _NormalizationMixin): step.stats = stats diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index 925bad391..901a2672a 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -23,7 +23,7 @@ from collections.abc import Callable, Iterable, Sequence from copy import deepcopy from dataclasses import dataclass, field from pathlib import Path -from typing import Any, Generic, TypedDict, TypeVar, cast +from typing import Any, Generic, TypeAlias, TypedDict, TypeVar, cast import torch from huggingface_hub import ModelHubMixin, hf_hub_download @@ -805,6 +805,10 @@ class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]): return transformed_transition[TransitionKey.COMPLEMENTARY_DATA] +RobotProcessorPipeline: TypeAlias = DataProcessorPipeline +PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline + + class ObservationProcessorStep(ProcessorStep, ABC): """Base class for processors that modify only the observation component of a transition. diff --git a/src/lerobot/record.py b/src/lerobot/record.py index b09f04d88..65dec20f8 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -76,7 +76,12 @@ from lerobot.datasets.utils import hw_to_dataset_features from lerobot.datasets.video_utils import VideoEncodingManager from lerobot.policies.factory import make_policy, make_pre_post_processors from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep, TransitionKey +from lerobot.processor import ( + IdentityProcessorStep, + PolicyProcessorPipeline, + RobotProcessorPipeline, + TransitionKey, +) from lerobot.processor.converters import ( action_to_transition, observation_to_transition, @@ -235,22 +240,22 @@ def record_loop( dataset: LeRobotDataset | None = None, teleop: Teleoperator | list[Teleoperator] | None = None, policy: PreTrainedPolicy | None = None, - preprocessor: DataProcessorPipeline | None = None, - postprocessor: DataProcessorPipeline | None = None, + preprocessor: PolicyProcessorPipeline | None = None, + postprocessor: PolicyProcessorPipeline | None = None, control_time_s: int | None = None, - teleop_action_processor: DataProcessorPipeline | None = None, # runs after teleop - robot_action_processor: DataProcessorPipeline | None = None, # runs before robot - robot_observation_processor: DataProcessorPipeline | None = None, # runs after robot + teleop_action_processor: RobotProcessorPipeline | None = None, # runs after teleop + robot_action_processor: RobotProcessorPipeline | None = None, # runs before robot + robot_observation_processor: RobotProcessorPipeline | None = None, # runs after robot single_task: str | None = None, display_data: bool = False, ): - teleop_action_processor = teleop_action_processor or DataProcessorPipeline( + teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr ) - robot_action_processor = robot_action_processor or DataProcessorPipeline( + robot_action_processor = robot_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=lambda tr: tr, to_output=transition_to_robot_action ) - robot_observation_processor = robot_observation_processor or DataProcessorPipeline( + robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=observation_to_transition, to_output=lambda tr: tr, diff --git a/src/lerobot/replay.py b/src/lerobot/replay.py index ba9828c1e..7355baa54 100644 --- a/src/lerobot/replay.py +++ b/src/lerobot/replay.py @@ -47,7 +47,7 @@ from pprint import pformat from lerobot.configs import parser from lerobot.datasets.lerobot_dataset import LeRobotDataset -from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep +from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline from lerobot.processor.converters import action_to_transition, transition_to_robot_action from lerobot.robots import ( # noqa: F401 Robot, @@ -85,7 +85,7 @@ class ReplayConfig: # Use vocal synthesis to read events. play_sounds: bool = True # Optional processor for actions before sending to robot - robot_action_processor: DataProcessorPipeline | None = None + robot_action_processor: RobotProcessorPipeline | None = None @parser.wrap() @@ -94,7 +94,7 @@ def replay(cfg: ReplayConfig): logging.info(pformat(asdict(cfg))) # Initialize robot action processor with default if not provided - robot_action_processor = cfg.robot_action_processor or DataProcessorPipeline( + robot_action_processor = cfg.robot_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=transition_to_robot_action, # type: ignore[arg-type] diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index 7aa268fb0..a89872502 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -61,7 +61,7 @@ import rerun as rr from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401 from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401 from lerobot.configs import parser -from lerobot.processor import DataProcessorPipeline, IdentityProcessorStep +from lerobot.processor import IdentityProcessorStep, RobotProcessorPipeline from lerobot.processor.converters import ( action_to_transition, observation_to_transition, @@ -104,9 +104,9 @@ class TeleoperateConfig: # Display all cameras on screen display_data: bool = False # Optional processors for data transformation - teleop_action_processor: DataProcessorPipeline | None = None # runs after teleop - robot_action_processor: DataProcessorPipeline | None = None # runs before robot - robot_observation_processor: DataProcessorPipeline | None = None # runs after robot + teleop_action_processor: RobotProcessorPipeline | None = None # runs after teleop + robot_action_processor: RobotProcessorPipeline | None = None # runs before robot + robot_observation_processor: RobotProcessorPipeline | None = None # runs after robot def teleop_loop( @@ -115,20 +115,20 @@ def teleop_loop( fps: int, display_data: bool = False, duration: float | None = None, - teleop_action_processor: DataProcessorPipeline | None = None, - robot_action_processor: DataProcessorPipeline | None = None, - robot_observation_processor: DataProcessorPipeline | None = None, + teleop_action_processor: RobotProcessorPipeline | None = None, + robot_action_processor: RobotProcessorPipeline | None = None, + robot_observation_processor: RobotProcessorPipeline | None = None, ): # Initialize processors with defaults if not provided - teleop_action_processor = teleop_action_processor or DataProcessorPipeline( + teleop_action_processor = teleop_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=lambda tr: tr ) - robot_action_processor = robot_action_processor or DataProcessorPipeline( + robot_action_processor = robot_action_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=lambda tr: tr, to_output=transition_to_robot_action, # type: ignore[arg-type] ) - robot_observation_processor = robot_observation_processor or DataProcessorPipeline( + robot_observation_processor = robot_observation_processor or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=observation_to_transition, to_output=lambda tr: tr, diff --git a/src/lerobot/utils/control_utils.py b/src/lerobot/utils/control_utils.py index 5553d56f2..fb883d2c5 100644 --- a/src/lerobot/utils/control_utils.py +++ b/src/lerobot/utils/control_utils.py @@ -31,7 +31,7 @@ from termcolor import colored from lerobot.datasets.lerobot_dataset import LeRobotDataset from lerobot.datasets.utils import DEFAULT_FEATURES from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import DataProcessorPipeline, TransitionKey +from lerobot.processor import PolicyProcessorPipeline, TransitionKey from lerobot.robots import Robot @@ -102,8 +102,8 @@ def predict_action( observation: dict[str, np.ndarray], policy: PreTrainedPolicy, device: torch.device, - preprocessor: DataProcessorPipeline, - postprocessor: DataProcessorPipeline, + preprocessor: PolicyProcessorPipeline, + postprocessor: PolicyProcessorPipeline, use_amp: bool, task: str | None = None, robot_type: str | None = None, diff --git a/src/lerobot/utils/train_utils.py b/src/lerobot/utils/train_utils.py index 10eab8741..be2eb8146 100644 --- a/src/lerobot/utils/train_utils.py +++ b/src/lerobot/utils/train_utils.py @@ -32,7 +32,7 @@ from lerobot.datasets.utils import load_json, write_json from lerobot.optim.optimizers import load_optimizer_state, save_optimizer_state from lerobot.optim.schedulers import load_scheduler_state, save_scheduler_state from lerobot.policies.pretrained import PreTrainedPolicy -from lerobot.processor import DataProcessorPipeline +from lerobot.processor import PolicyProcessorPipeline from lerobot.utils.random_utils import load_rng_state, save_rng_state @@ -75,8 +75,8 @@ def save_checkpoint( policy: PreTrainedPolicy, optimizer: Optimizer, scheduler: LRScheduler | None = None, - preprocessor: DataProcessorPipeline | None = None, - postprocessor: DataProcessorPipeline | None = None, + preprocessor: PolicyProcessorPipeline | None = None, + postprocessor: PolicyProcessorPipeline | None = None, ) -> None: """This function creates the following directory structure: