From f1cfdfced9d7153b21bc9a1594717ee76c7ead71 Mon Sep 17 00:00:00 2001 From: Steven Palma Date: Fri, 5 Sep 2025 11:31:30 +0200 Subject: [PATCH] fix(processor): recover type inference for use of processors (#1873) --- src/lerobot/processor/pipeline.py | 4 ++-- src/lerobot/record.py | 17 +++++++-------- src/lerobot/teleoperate.py | 35 +++++++++++++++++++------------ 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/lerobot/processor/pipeline.py b/src/lerobot/processor/pipeline.py index 5f0a9be46..f26610e9e 100644 --- a/src/lerobot/processor/pipeline.py +++ b/src/lerobot/processor/pipeline.py @@ -782,8 +782,8 @@ class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]): return transformed_transition[TransitionKey.COMPLEMENTARY_DATA] -RobotProcessorPipeline: TypeAlias = DataProcessorPipeline -PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline +RobotProcessorPipeline: TypeAlias = DataProcessorPipeline[TOutput] +PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline[TOutput] class ObservationProcessorStep(ProcessorStep, ABC): diff --git a/src/lerobot/record.py b/src/lerobot/record.py index eac1b6c23..b8e862d33 100644 --- a/src/lerobot/record.py +++ b/src/lerobot/record.py @@ -254,13 +254,13 @@ def record_loop( ): teleop_action_processor: RobotProcessorPipeline[EnvTransition] = ( teleop_action_processor - or RobotProcessorPipeline[EnvTransition]( + or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition ) ) robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = ( robot_action_processor - or RobotProcessorPipeline[dict[str, Any]]( + or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=identity_transition, to_output=transition_to_robot_action, @@ -268,7 +268,7 @@ def record_loop( ) robot_observation_processor: RobotProcessorPipeline[EnvTransition] = ( robot_observation_processor - or RobotProcessorPipeline[EnvTransition]( + or RobotProcessorPipeline( steps=[IdentityProcessorStep()], to_transition=observation_to_transition, to_output=identity_transition, @@ -323,7 +323,7 @@ def record_loop( obs = robot.get_observation() # Applies a pipeline to the raw robot observation, default is IdentityProcessor - obs_transition: EnvTransition = robot_observation_processor(obs) + obs_transition = robot_observation_processor(obs) # Get action from either policy or teleop if policy is not None and preprocessor is not None and postprocessor is not None: @@ -355,8 +355,7 @@ def record_loop( # Applies a pipeline to the raw teleop action, default is IdentityProcessor # TODO(Steven): This assumes that the processor passed by the user should have identity_transition as to_output. - # TODO(Steven): Why is this not automatically typed as EnvTransition? - teleop_transition: EnvTransition = teleop_action_processor(act) + teleop_transition = teleop_action_processor(act) elif isinstance(teleop, list): arm_action = teleop_arm.get_action() @@ -364,7 +363,7 @@ def record_loop( keyboard_action = teleop_keyboard.get_action() base_action = robot._from_keyboard_to_base_action(keyboard_action) act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action - teleop_transition: EnvTransition = teleop_action_processor(act) + teleop_transition = teleop_action_processor(act) else: logging.info( "No policy or teleoperator provided, skipping action generation. " @@ -376,9 +375,9 @@ def record_loop( # Applies a pipeline to the action, default is IdentityProcessor # IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action() if policy is not None and policy_transition is not None: - robot_action_to_send: dict[str, Any] = robot_action_processor(policy_transition) + robot_action_to_send = robot_action_processor(policy_transition) else: - robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition) + robot_action_to_send = robot_action_processor(teleop_transition) # Send action to robot # Action can eventually be clipped using `max_relative_target`, diff --git a/src/lerobot/teleoperate.py b/src/lerobot/teleoperate.py index eb7f4bc1b..8f5236d96 100644 --- a/src/lerobot/teleoperate.py +++ b/src/lerobot/teleoperate.py @@ -122,18 +122,27 @@ def teleop_loop( robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None, ): # Initialize processors with defaults if not provided - teleop_action_processor = teleop_action_processor or RobotProcessorPipeline[EnvTransition]( - steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition + teleop_action_processor: RobotProcessorPipeline[EnvTransition] = ( + teleop_action_processor + or RobotProcessorPipeline( + steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition + ) ) - robot_action_processor = robot_action_processor or RobotProcessorPipeline[dict[str, Any]]( - steps=[IdentityProcessorStep()], - to_transition=identity_transition, - to_output=transition_to_robot_action, # type: ignore[arg-type] + robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = ( + robot_action_processor + or RobotProcessorPipeline( + steps=[IdentityProcessorStep()], + to_transition=identity_transition, + to_output=transition_to_robot_action, # type: ignore[arg-type] + ) ) - robot_observation_processor = robot_observation_processor or RobotProcessorPipeline[EnvTransition]( - steps=[IdentityProcessorStep()], - to_transition=observation_to_transition, - to_output=identity_transition, + robot_observation_processor: RobotProcessorPipeline[EnvTransition] = ( + robot_observation_processor + or RobotProcessorPipeline( + steps=[IdentityProcessorStep()], + to_transition=observation_to_transition, + to_output=identity_transition, + ) ) # Reset processors @@ -151,10 +160,10 @@ def teleop_loop( raw_action = teleop.get_action() # Process teleop action through pipeline - teleop_transition: EnvTransition = teleop_action_processor(raw_action) + teleop_transition = teleop_action_processor(raw_action) # Process action for robot through pipeline - robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition) + robot_action_to_send = robot_action_processor(teleop_transition) # Send processed action to robot (robot_action_processor.to_output should return dict[str, Any]) robot.send_action(robot_action_to_send) # type: ignore[arg-type] @@ -163,7 +172,7 @@ def teleop_loop( # Get robot observation obs = robot.get_observation() # Process robot observation through pipeline - obs_transition: EnvTransition = robot_observation_processor(obs) + obs_transition = robot_observation_processor(obs) log_rerun_data( observation=obs_transition.get(TransitionKey.OBSERVATION),