fix(processor): recover type inference for use of processors (#1873)

This commit is contained in:
Steven Palma
2025-09-05 11:31:30 +02:00
committed by GitHub
parent 888a5b6249
commit f1cfdfced9
3 changed files with 32 additions and 24 deletions

View File

@@ -782,8 +782,8 @@ class DataProcessorPipeline(ModelHubMixin, Generic[TOutput]):
return transformed_transition[TransitionKey.COMPLEMENTARY_DATA]
RobotProcessorPipeline: TypeAlias = DataProcessorPipeline
PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline
RobotProcessorPipeline: TypeAlias = DataProcessorPipeline[TOutput]
PolicyProcessorPipeline: TypeAlias = DataProcessorPipeline[TOutput]
class ObservationProcessorStep(ProcessorStep, ABC):

View File

@@ -254,13 +254,13 @@ def record_loop(
):
teleop_action_processor: RobotProcessorPipeline[EnvTransition] = (
teleop_action_processor
or RobotProcessorPipeline[EnvTransition](
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition
)
)
robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = (
robot_action_processor
or RobotProcessorPipeline[dict[str, Any]](
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action,
@@ -268,7 +268,7 @@ def record_loop(
)
robot_observation_processor: RobotProcessorPipeline[EnvTransition] = (
robot_observation_processor
or RobotProcessorPipeline[EnvTransition](
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
@@ -323,7 +323,7 @@ def record_loop(
obs = robot.get_observation()
# Applies a pipeline to the raw robot observation, default is IdentityProcessor
obs_transition: EnvTransition = robot_observation_processor(obs)
obs_transition = robot_observation_processor(obs)
# Get action from either policy or teleop
if policy is not None and preprocessor is not None and postprocessor is not None:
@@ -355,8 +355,7 @@ def record_loop(
# Applies a pipeline to the raw teleop action, default is IdentityProcessor
# TODO(Steven): This assumes that the processor passed by the user should have identity_transition as to_output.
# TODO(Steven): Why is this not automatically typed as EnvTransition?
teleop_transition: EnvTransition = teleop_action_processor(act)
teleop_transition = teleop_action_processor(act)
elif isinstance(teleop, list):
arm_action = teleop_arm.get_action()
@@ -364,7 +363,7 @@ def record_loop(
keyboard_action = teleop_keyboard.get_action()
base_action = robot._from_keyboard_to_base_action(keyboard_action)
act = {**arm_action, **base_action} if len(base_action) > 0 else arm_action
teleop_transition: EnvTransition = teleop_action_processor(act)
teleop_transition = teleop_action_processor(act)
else:
logging.info(
"No policy or teleoperator provided, skipping action generation. "
@@ -376,9 +375,9 @@ def record_loop(
# Applies a pipeline to the action, default is IdentityProcessor
# IMPORTANT: action_pipeline.to_output must return a dict suitable for robot.send_action()
if policy is not None and policy_transition is not None:
robot_action_to_send: dict[str, Any] = robot_action_processor(policy_transition)
robot_action_to_send = robot_action_processor(policy_transition)
else:
robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition)
robot_action_to_send = robot_action_processor(teleop_transition)
# Send action to robot
# Action can eventually be clipped using `max_relative_target`,

View File

@@ -122,18 +122,27 @@ def teleop_loop(
robot_observation_processor: RobotProcessorPipeline[EnvTransition] | None = None,
):
# Initialize processors with defaults if not provided
teleop_action_processor = teleop_action_processor or RobotProcessorPipeline[EnvTransition](
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition
teleop_action_processor: RobotProcessorPipeline[EnvTransition] = (
teleop_action_processor
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()], to_transition=action_to_transition, to_output=identity_transition
)
)
robot_action_processor = robot_action_processor or RobotProcessorPipeline[dict[str, Any]](
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action, # type: ignore[arg-type]
robot_action_processor: RobotProcessorPipeline[dict[str, Any]] = (
robot_action_processor
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=identity_transition,
to_output=transition_to_robot_action, # type: ignore[arg-type]
)
)
robot_observation_processor = robot_observation_processor or RobotProcessorPipeline[EnvTransition](
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
robot_observation_processor: RobotProcessorPipeline[EnvTransition] = (
robot_observation_processor
or RobotProcessorPipeline(
steps=[IdentityProcessorStep()],
to_transition=observation_to_transition,
to_output=identity_transition,
)
)
# Reset processors
@@ -151,10 +160,10 @@ def teleop_loop(
raw_action = teleop.get_action()
# Process teleop action through pipeline
teleop_transition: EnvTransition = teleop_action_processor(raw_action)
teleop_transition = teleop_action_processor(raw_action)
# Process action for robot through pipeline
robot_action_to_send: dict[str, Any] = robot_action_processor(teleop_transition)
robot_action_to_send = robot_action_processor(teleop_transition)
# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
robot.send_action(robot_action_to_send) # type: ignore[arg-type]
@@ -163,7 +172,7 @@ def teleop_loop(
# Get robot observation
obs = robot.get_observation()
# Process robot observation through pipeline
obs_transition: EnvTransition = robot_observation_processor(obs)
obs_transition = robot_observation_processor(obs)
log_rerun_data(
observation=obs_transition.get(TransitionKey.OBSERVATION),