mirror of
https://github.com/huggingface/lerobot.git
synced 2026-06-04 12:51:27 +00:00
* refactor(processor): enhance type annotations for processors in record, replay, teleoperate, and control utils - Updated type annotations for preprocessor and postprocessor parameters in record_loop and predict_action functions to specify the expected dictionary types. - Adjusted robot_action_processor type in ReplayConfig and TeleoperateConfig to improve clarity and maintainability. - Ensured consistency in type definitions across multiple files, enhancing overall code readability. * refactor(processor): enhance type annotations for RobotProcessorPipeline in various files - Updated type annotations for RobotProcessorPipeline instances in evaluate.py, record.py, replay.py, teleoperate.py, and other related files to specify input and output types more clearly. - Introduced new type conversions for PolicyAction and EnvTransition to improve type safety and maintainability across the processing pipelines. - Ensured consistency in type definitions, enhancing overall code readability and reducing potential runtime errors. * refactor(processor): update transition handling in processors to use transition_to_batch - Replaced direct transition handling with transition_to_batch in various processor tests and implementations to ensure consistent batching of input data. - Updated assertions in tests to reflect changes in data structure, enhancing clarity and maintainability. - Improved overall code readability by standardizing the way transitions are processed across different processor types. * refactor(tests): standardize transition key usage in processor tests - Updated assertions in processor test files to utilize the TransitionKey for action references, enhancing consistency across tests. - Replaced direct string references with TransitionKey constants for improved readability and maintainability. - Ensured that all relevant tests reflect these changes, contributing to a more uniform approach in handling transitions.
261 lines
9.2 KiB
Python
261 lines
9.2 KiB
Python
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Simple script to control a robot from teleoperation.
|
|
|
|
Example:
|
|
|
|
```shell
|
|
lerobot-teleoperate \
|
|
--robot.type=so101_follower \
|
|
--robot.port=/dev/tty.usbmodem58760431541 \
|
|
--robot.cameras="{ front: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}" \
|
|
--robot.id=black \
|
|
--teleop.type=so101_leader \
|
|
--teleop.port=/dev/tty.usbmodem58760431551 \
|
|
--teleop.id=blue \
|
|
--display_data=true
|
|
```
|
|
|
|
Example teleoperation with bimanual so100:
|
|
|
|
```shell
|
|
lerobot-teleoperate \
|
|
--robot.type=bi_so100_follower \
|
|
--robot.left_arm_port=/dev/tty.usbmodem5A460851411 \
|
|
--robot.right_arm_port=/dev/tty.usbmodem5A460812391 \
|
|
--robot.id=bimanual_follower \
|
|
--robot.cameras='{
|
|
left: {"type": "opencv", "index_or_path": 0, "width": 1920, "height": 1080, "fps": 30},
|
|
top: {"type": "opencv", "index_or_path": 1, "width": 1920, "height": 1080, "fps": 30},
|
|
right: {"type": "opencv", "index_or_path": 2, "width": 1920, "height": 1080, "fps": 30}
|
|
}' \
|
|
--teleop.type=bi_so100_leader \
|
|
--teleop.left_arm_port=/dev/tty.usbmodem5A460828611 \
|
|
--teleop.right_arm_port=/dev/tty.usbmodem5A460826981 \
|
|
--teleop.id=bimanual_leader \
|
|
--display_data=true
|
|
```
|
|
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from dataclasses import asdict, dataclass
|
|
from pprint import pformat
|
|
from typing import Any
|
|
|
|
import rerun as rr
|
|
|
|
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
|
|
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
|
|
from lerobot.configs import parser
|
|
from lerobot.processor import EnvTransition, IdentityProcessorStep, RobotProcessorPipeline, TransitionKey
|
|
from lerobot.processor.converters import (
|
|
identity_transition,
|
|
observation_to_transition,
|
|
robot_action_to_transition,
|
|
transition_to_robot_action,
|
|
)
|
|
from lerobot.processor.core import RobotAction
|
|
from lerobot.robots import ( # noqa: F401
|
|
Robot,
|
|
RobotConfig,
|
|
bi_so100_follower,
|
|
hope_jr,
|
|
koch_follower,
|
|
make_robot_from_config,
|
|
so100_follower,
|
|
so101_follower,
|
|
)
|
|
from lerobot.teleoperators import ( # noqa: F401
|
|
Teleoperator,
|
|
TeleoperatorConfig,
|
|
bi_so100_leader,
|
|
gamepad,
|
|
homunculus,
|
|
koch_leader,
|
|
make_teleoperator_from_config,
|
|
so100_leader,
|
|
so101_leader,
|
|
)
|
|
from lerobot.utils.robot_utils import busy_wait
|
|
from lerobot.utils.utils import init_logging, move_cursor_up
|
|
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
|
|
|
|
|
|
@dataclass
|
|
class TeleoperateConfig:
|
|
# TODO: pepijn, steven: if more robots require multiple teleoperators (like lekiwi) its good to make this possibele in teleop.py and record.py with List[Teleoperator]
|
|
teleop: TeleoperatorConfig
|
|
robot: RobotConfig
|
|
# Limit the maximum frames per second.
|
|
fps: int = 60
|
|
teleop_time_s: float | None = None
|
|
# Display all cameras on screen
|
|
display_data: bool = False
|
|
# Optional processors for data transformation
|
|
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] | None = (
|
|
None # runs after teleop
|
|
)
|
|
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] | None = (
|
|
None # runs before robot
|
|
)
|
|
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] | None = (
|
|
None # runs after robot
|
|
)
|
|
|
|
|
|
def teleop_loop(
|
|
teleop: Teleoperator,
|
|
robot: Robot,
|
|
fps: int,
|
|
display_data: bool = False,
|
|
duration: float | None = None,
|
|
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] | None = None,
|
|
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] | None = None,
|
|
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] | None = None,
|
|
):
|
|
"""
|
|
This function continuously reads actions from a teleoperation device, processes them through optional
|
|
pipelines, sends them to a robot, and optionally displays the robot's state. The loop runs at a
|
|
specified frequency until a set duration is reached or it is manually interrupted.
|
|
|
|
Args:
|
|
teleop: The teleoperator device instance providing control actions.
|
|
robot: The robot instance being controlled.
|
|
fps: The target frequency for the control loop in frames per second.
|
|
display_data: If True, fetches robot observations and displays them in the console and Rerun.
|
|
duration: The maximum duration of the teleoperation loop in seconds. If None, the loop runs indefinitely.
|
|
teleop_action_processor: An optional pipeline to process raw actions from the teleoperator.
|
|
robot_action_processor: An optional pipeline to process actions before they are sent to the robot.
|
|
robot_observation_processor: An optional pipeline to process raw observations from the robot.
|
|
"""
|
|
# Initialize processors with defaults if not provided
|
|
teleop_action_processor: RobotProcessorPipeline[RobotAction, EnvTransition] = (
|
|
teleop_action_processor
|
|
or RobotProcessorPipeline[RobotAction, EnvTransition](
|
|
steps=[IdentityProcessorStep()],
|
|
to_transition=robot_action_to_transition,
|
|
to_output=identity_transition,
|
|
)
|
|
)
|
|
robot_action_processor: RobotProcessorPipeline[EnvTransition, RobotAction] = (
|
|
robot_action_processor
|
|
or RobotProcessorPipeline[EnvTransition, RobotAction](
|
|
steps=[IdentityProcessorStep()],
|
|
to_transition=identity_transition,
|
|
to_output=transition_to_robot_action,
|
|
)
|
|
)
|
|
robot_observation_processor: RobotProcessorPipeline[dict[str, Any], EnvTransition] = (
|
|
robot_observation_processor
|
|
or RobotProcessorPipeline[dict[str, Any], EnvTransition](
|
|
steps=[IdentityProcessorStep()],
|
|
to_transition=observation_to_transition,
|
|
to_output=identity_transition,
|
|
)
|
|
)
|
|
|
|
# Reset processors
|
|
teleop_action_processor.reset()
|
|
robot_action_processor.reset()
|
|
robot_observation_processor.reset()
|
|
|
|
display_len = max(len(key) for key in robot.action_features)
|
|
start = time.perf_counter()
|
|
|
|
while True:
|
|
loop_start = time.perf_counter()
|
|
|
|
# Get teleop action
|
|
raw_action = teleop.get_action()
|
|
|
|
# Process teleop action through pipeline
|
|
teleop_transition = teleop_action_processor(raw_action)
|
|
|
|
# Process action for robot through pipeline
|
|
robot_action_to_send = robot_action_processor(teleop_transition)
|
|
|
|
# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
|
|
robot.send_action(robot_action_to_send) # type: ignore[arg-type]
|
|
|
|
if display_data:
|
|
# Get robot observation
|
|
obs = robot.get_observation()
|
|
# Process robot observation through pipeline
|
|
obs_transition = robot_observation_processor(obs)
|
|
|
|
log_rerun_data(
|
|
observation=obs_transition.get(TransitionKey.OBSERVATION),
|
|
action=teleop_transition.get(TransitionKey.ACTION),
|
|
)
|
|
|
|
print("\n" + "-" * (display_len + 10))
|
|
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
|
|
# Display the final robot action that was sent
|
|
for motor, value in robot_action_to_send.items():
|
|
print(f"{motor:<{display_len}} | {value:>7.2f}")
|
|
move_cursor_up(len(robot_action_to_send) + 5)
|
|
|
|
dt_s = time.perf_counter() - loop_start
|
|
busy_wait(1 / fps - dt_s)
|
|
loop_s = time.perf_counter() - loop_start
|
|
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
|
|
|
if duration is not None and time.perf_counter() - start >= duration:
|
|
return
|
|
|
|
|
|
@parser.wrap()
|
|
def teleoperate(cfg: TeleoperateConfig):
|
|
init_logging()
|
|
logging.info(pformat(asdict(cfg)))
|
|
if cfg.display_data:
|
|
_init_rerun(session_name="teleoperation")
|
|
|
|
teleop = make_teleoperator_from_config(cfg.teleop)
|
|
robot = make_robot_from_config(cfg.robot)
|
|
|
|
teleop.connect()
|
|
robot.connect()
|
|
|
|
try:
|
|
teleop_loop(
|
|
teleop=teleop,
|
|
robot=robot,
|
|
fps=cfg.fps,
|
|
display_data=cfg.display_data,
|
|
duration=cfg.teleop_time_s,
|
|
teleop_action_processor=cfg.teleop_action_processor,
|
|
robot_action_processor=cfg.robot_action_processor,
|
|
robot_observation_processor=cfg.robot_observation_processor,
|
|
)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
if cfg.display_data:
|
|
rr.rerun_shutdown()
|
|
teleop.disconnect()
|
|
robot.disconnect()
|
|
|
|
|
|
def main():
|
|
teleoperate()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|