[pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci
This commit is contained in:
pre-commit-ci[bot]
2025-03-24 13:41:27 +00:00
committed by Michel Aractingi
parent 2abbd60a0d
commit 0ea27704f6
123 changed files with 1161 additions and 3425 deletions

View File

@@ -1,13 +1,14 @@
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.utils.utils import init_hydra_config
from lerobot.common.robot_devices.utils import busy_wait
from lerobot.scripts.server.kinematics import RobotKinematics
import argparse
import logging
import time
import torch
import numpy as np
import argparse
import numpy as np
import torch
from lerobot.common.robot_devices.robots.factory import make_robot
from lerobot.common.robot_devices.utils import busy_wait
from lerobot.common.utils.utils import init_hydra_config
from lerobot.scripts.server.kinematics import RobotKinematics
logging.basicConfig(level=logging.INFO)
@@ -187,9 +188,7 @@ class KeyboardController(InputController):
class GamepadController(InputController):
"""Generate motion deltas from gamepad input."""
def __init__(
self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01, deadzone=0.1
):
def __init__(self, x_step_size=0.01, y_step_size=0.01, z_step_size=0.01, deadzone=0.1):
super().__init__(x_step_size, y_step_size, z_step_size)
self.deadzone = deadzone
self.joystick = None
@@ -203,9 +202,7 @@ class GamepadController(InputController):
pygame.joystick.init()
if pygame.joystick.get_count() == 0:
logging.error(
"No gamepad detected. Please connect a gamepad and try again."
)
logging.error("No gamepad detected. Please connect a gamepad and try again.")
self.running = False
return
@@ -338,18 +335,12 @@ class GamepadControllerHID(InputController):
devices = hid.enumerate()
for device in devices:
if (
device["vendor_id"] == self.vendor_id
and device["product_id"] == self.product_id
):
logging.info(
f"Found gamepad: {device.get('product_string', 'Unknown')}"
)
if device["vendor_id"] == self.vendor_id and device["product_id"] == self.product_id:
logging.info(f"Found gamepad: {device.get('product_string', 'Unknown')}")
return device
logging.error(
f"No gamepad with vendor ID 0x{self.vendor_id:04X} and "
f"product ID 0x{self.product_id:04X} found"
f"No gamepad with vendor ID 0x{self.vendor_id:04X} and product ID 0x{self.product_id:04X} found"
)
return None
@@ -381,9 +372,7 @@ class GamepadControllerHID(InputController):
except OSError as e:
logging.error(f"Error opening gamepad: {e}")
logging.error(
"You might need to run this with sudo/admin privileges on some systems"
)
logging.error("You might need to run this with sudo/admin privileges on some systems")
self.running = False
def stop(self):
@@ -421,12 +410,8 @@ class GamepadControllerHID(InputController):
# Apply deadzone
self.left_x = 0 if abs(self.left_x) < self.deadzone else self.left_x
self.left_y = 0 if abs(self.left_y) < self.deadzone else self.left_y
self.right_x = (
0 if abs(self.right_x) < self.deadzone else self.right_x
)
self.right_y = (
0 if abs(self.right_y) < self.deadzone else self.right_y
)
self.right_x = 0 if abs(self.right_x) < self.deadzone else self.right_x
self.right_y = 0 if abs(self.right_y) < self.deadzone else self.right_y
# Parse button states (byte 5 in the Logitech RumblePad 2)
buttons = data[5]
@@ -493,9 +478,7 @@ def test_inverse_kinematics(robot, fps=10):
joint_positions = obs["observation.state"].cpu().numpy()
ee_pos = RobotKinematics.fk_gripper_tip(joint_positions)
desired_ee_pos = ee_pos
target_joint_state = RobotKinematics.ik(
joint_positions, desired_ee_pos, position_only=True
)
target_joint_state = RobotKinematics.ik(joint_positions, desired_ee_pos, position_only=True)
robot.send_action(torch.from_numpy(target_joint_state))
logging.info(f"Target Joint State: {target_joint_state}")
busy_wait(1 / fps - (time.perf_counter() - loop_start_time))
@@ -573,17 +556,13 @@ def teleoperate_delta_inverse_kinematics_with_leader(robot, fps=10):
robot.send_action(torch.from_numpy(target_joint_state))
# Logging
logging.info(
f"Current EE: {current_ee_pos[:3, 3]}, Desired EE: {desired_ee_pos[:3, 3]}"
)
logging.info(f"Current EE: {current_ee_pos[:3, 3]}, Desired EE: {desired_ee_pos[:3, 3]}")
logging.info(f"Delta EE: {ee_delta[:3, 3]}")
busy_wait(1 / fps - (time.perf_counter() - loop_start_time))
def teleoperate_delta_inverse_kinematics(
robot, controller, fps=10, bounds=None, fk_func=None
):
def teleoperate_delta_inverse_kinematics(robot, controller, fps=10, bounds=None, fk_func=None):
"""
Control a robot using delta end-effector movements from any input controller.
@@ -597,9 +576,7 @@ def teleoperate_delta_inverse_kinematics(
if fk_func is None:
fk_func = RobotKinematics.fk_gripper_tip
logging.info(
f"Testing Delta End-Effector Control with {controller.__class__.__name__}"
)
logging.info(f"Testing Delta End-Effector Control with {controller.__class__.__name__}")
# Initial position capture
obs = robot.capture_observation()
@@ -631,9 +608,7 @@ def teleoperate_delta_inverse_kinematics(
# Apply bounds if provided
if bounds is not None:
desired_ee_pos[:3, 3] = np.clip(
desired_ee_pos[:3, 3], bounds["min"], bounds["max"]
)
desired_ee_pos[:3, 3] = np.clip(desired_ee_pos[:3, 3], bounds["min"], bounds["max"])
# Only send commands if there's actual movement
if any([abs(v) > 0.001 for v in [delta_x, delta_y, delta_z]]):
@@ -684,14 +659,10 @@ def teleoperate_gym_env(env, controller, fps: int = 30):
if any([abs(v) > 0.001 for v in [delta_x, delta_y, delta_z]]):
# Step the environment - pass action as a tensor with intervention flag
action_tensor = torch.from_numpy(action.astype(np.float32))
obs, reward, terminated, truncated, info = env.step(
(action_tensor, False)
)
obs, reward, terminated, truncated, info = env.step((action_tensor, False))
# Log information
logging.info(
f"Action: [{delta_x:.4f}, {delta_y:.4f}, {delta_z:.4f}]"
)
logging.info(f"Action: [{delta_x:.4f}, {delta_y:.4f}, {delta_z:.4f}]")
logging.info(f"Reward: {reward}")
# Reset if episode ended
@@ -761,20 +732,14 @@ if __name__ == "__main__":
# Determine controller type based on mode prefix
controller = None
if args.mode.startswith("keyboard"):
controller = KeyboardController(
x_step_size=0.01, y_step_size=0.01, z_step_size=0.05
)
controller = KeyboardController(x_step_size=0.01, y_step_size=0.01, z_step_size=0.05)
elif args.mode.startswith("gamepad"):
controller = GamepadController(
x_step_size=0.02, y_step_size=0.02, z_step_size=0.05
)
controller = GamepadController(x_step_size=0.02, y_step_size=0.02, z_step_size=0.05)
# Handle mode categories
if args.mode in ["keyboard", "gamepad"]:
# Direct robot control modes
teleoperate_delta_inverse_kinematics(
robot, controller, bounds=bounds, fps=10
)
teleoperate_delta_inverse_kinematics(robot, controller, bounds=bounds, fps=10)
elif args.mode in ["keyboard_gym", "gamepad_gym"]:
# Gym environment control modes