Files
lerobot-clone/src/lerobot/teleoperators/phone/phone_processor.py
Steven Palma e881fb6678 refactor(pipeline): feature contract now categorizes between OBS or Action (#1867)
* refactor(processor): signature of transform_features

* refactor(processor): remove prefixes + processor respect new transform_features signature + update test accordingly

* refactor(processor): rename now is only for visual

* refactor(processor): update normalize processor

* refactor(processor): update vanilla processor features

* refactor(processor): feature contract now uses its own enum

* chore(processor): rename renameprocessor

* chore(processor): minor changes

* refactor(processor): add create & change aggregate

* refactor(processor): update aggregate

* refactor(processor): simplify to functions, fix features contracts and rename function

* test(processor): remove to converter tests as now they are very simple

* chore(docs): recover docs joint observations processor

* fix(processor): update RKP

* fix(tests): recv diff test_pipeline

* chore(tests): add docs to test

* chore(processor): leave obs language constant untouched

* fix(processor): correct new shape of feature in crop image processor
2025-09-09 18:27:30 +02:00

107 lines
4.9 KiB
Python

# !/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from dataclasses import dataclass, field
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.constants import ACTION
from lerobot.processor import ActionProcessorStep, ProcessorStepRegistry
from lerobot.teleoperators.phone.config_phone import PhoneOS
@ProcessorStepRegistry.register("map_phone_action_to_robot_action")
@dataclass
class MapPhoneActionToRobotAction(ActionProcessorStep):
"""
Maps calibrated phone pose actions to standardized robot action inputs.
This processor step acts as a bridge between the phone teleoperator's output
and the robot's expected action format. It remaps the phone's 6-DoF pose
(position and rotation) to the robot's target end-effector pose, applying
necessary axis inversions and swaps. It also interprets platform-specific
button presses to generate a gripper command.
Attributes:
platform: The operating system of the phone (iOS or Android), used
to determine the correct button mappings for the gripper.
"""
platform: PhoneOS
_enabled_prev: bool = field(default=False, init=False, repr=False)
def action(self, act: dict) -> dict:
"""
Processes the phone action dictionary to create a robot action dictionary.
Args:
act: The input action dictionary from the phone teleoperator.
Returns:
A new action dictionary formatted for the robot controller.
Raises:
ValueError: If 'pos' or 'rot' keys are missing from the input action.
"""
# Pop them from the action
enabled = bool(act.pop(f"{ACTION}.phone.enabled", 0))
pos = act.pop(f"{ACTION}.phone.pos", None)
rot = act.pop(f"{ACTION}.phone.rot", None)
inputs = act.pop(f"{ACTION}.phone.raw_inputs", {})
if pos is None or rot is None:
raise ValueError("pos and rot must be present in action")
rotvec = rot.as_rotvec() # Absolute orientation as rotvec
# Map certain inputs to certain actions
if self.platform == PhoneOS.IOS:
gripper = float(inputs.get("a3", 0.0))
else:
a = float(inputs.get("reservedButtonA", 0.0))
b = float(inputs.get("reservedButtonB", 0.0))
gripper = (
a - b
) # Positive if a is pressed, negative if b is pressed, 0 if both or neither are pressed
# For some actions we need to invert the axis
act[f"{ACTION}.enabled"] = enabled
act[f"{ACTION}.target_x"] = -pos[1] if enabled else 0.0
act[f"{ACTION}.target_y"] = pos[0] if enabled else 0.0
act[f"{ACTION}.target_z"] = pos[2] if enabled else 0.0
act[f"{ACTION}.target_wx"] = rotvec[1] if enabled else 0.0
act[f"{ACTION}.target_wy"] = rotvec[0] if enabled else 0.0
act[f"{ACTION}.target_wz"] = -rotvec[2] if enabled else 0.0
act[f"{ACTION}.gripper"] = gripper # Still send gripper action when disabled
return act
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
features[PipelineFeatureType.ACTION].pop("phone.enabled", None)
features[PipelineFeatureType.ACTION].pop("phone.pos", None)
features[PipelineFeatureType.ACTION].pop("phone.rot", None)
features[PipelineFeatureType.ACTION].pop("phone.raw_inputs", None)
features[PipelineFeatureType.ACTION]["enabled"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_x"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_y"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_z"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_wx"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_wy"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["target_wz"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
features[PipelineFeatureType.ACTION]["gripper"] = PolicyFeature(type=FeatureType.ACTION, shape=(1,))
return features