diff --git a/src/lerobot/robots/unitree_g1/g1_utils.py b/src/lerobot/robots/unitree_g1/g1_utils.py index d42dbbb1c..b4bce860b 100644 --- a/src/lerobot/robots/unitree_g1/g1_utils.py +++ b/src/lerobot/robots/unitree_g1/g1_utils.py @@ -2,44 +2,6 @@ from enum import IntEnum # ruff: noqa: N801, N815 -# ============================================================================= -# Serialization Keys for ZMQ Communication Protocol -# ============================================================================= -# These constants define the JSON keys used for robot state/command serialization. -# They are shared between robot_server.py (robot-side) and unitree_sdk2_socket.py (client-side). - -# Top-level message keys -TOPIC = "topic" -DATA = "data" - -# Motor state keys -MOTOR_STATE = "motor_state" -MOTOR_Q = "q" # position -MOTOR_DQ = "dq" # velocity -MOTOR_TAU_EST = "tau_est" # estimated torque -MOTOR_TEMPERATURE = "temperature" - -# Motor command keys -MOTOR_CMD = "motor_cmd" -MOTOR_MODE = "mode" -MOTOR_KP = "kp" -MOTOR_KD = "kd" -MOTOR_TAU = "tau" - -# IMU state keys -IMU_STATE = "imu_state" -IMU_QUATERNION = "quaternion" -IMU_GYROSCOPE = "gyroscope" -IMU_ACCELEROMETER = "accelerometer" -IMU_RPY = "rpy" -IMU_TEMPERATURE = "temperature" - -# Other state keys -WIRELESS_REMOTE = "wireless_remote" -MODE_MACHINE = "mode_machine" -MODE_PR = "mode_pr" - -# Number of motors NUM_MOTORS = 35 diff --git a/src/lerobot/robots/unitree_g1/run_g1_server.py b/src/lerobot/robots/unitree_g1/run_g1_server.py index 0498fa899..50b1e1929 100644 --- a/src/lerobot/robots/unitree_g1/run_g1_server.py +++ b/src/lerobot/robots/unitree_g1/run_g1_server.py @@ -32,30 +32,6 @@ import time from typing import Any import zmq -from g1_utils import ( - DATA, - IMU_ACCELEROMETER, - IMU_GYROSCOPE, - IMU_QUATERNION, - IMU_RPY, - IMU_STATE, - IMU_TEMPERATURE, - MODE_MACHINE, - MODE_PR, - MOTOR_CMD, - MOTOR_DQ, - MOTOR_KD, - MOTOR_KP, - MOTOR_MODE, - MOTOR_Q, - MOTOR_STATE, - MOTOR_TAU, - MOTOR_TAU_EST, - MOTOR_TEMPERATURE, - NUM_MOTORS, - TOPIC, - WIRELESS_REMOTE, -) from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_ @@ -69,6 +45,7 @@ kTopicLowState = "rt/lowstate" # observation from robot LOWCMD_PORT = 6000 LOWSTATE_PORT = 6001 +NUM_MOTORS = 35 def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]: @@ -77,41 +54,41 @@ def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]: for i in range(NUM_MOTORS): motor_states.append( { - MOTOR_Q: msg.motor_state[i].q, - MOTOR_DQ: msg.motor_state[i].dq, - MOTOR_TAU_EST: msg.motor_state[i].tau_est, - MOTOR_TEMPERATURE: msg.motor_state[i].temperature, + "q": msg.motor_state[i].q, + "dq": msg.motor_state[i].dq, + "tau_est": msg.motor_state[i].tau_est, + "temperature": msg.motor_state[i].temperature, } ) return { - MOTOR_STATE: motor_states, - IMU_STATE: { - IMU_QUATERNION: list(msg.imu_state.quaternion), - IMU_GYROSCOPE: list(msg.imu_state.gyroscope), - IMU_ACCELEROMETER: list(msg.imu_state.accelerometer), - IMU_RPY: list(msg.imu_state.rpy), - IMU_TEMPERATURE: msg.imu_state.temperature, + "motor_state": motor_states, + "imu_state": { + "quaternion": list(msg.imu_state.quaternion), + "gyroscope": list(msg.imu_state.gyroscope), + "accelerometer": list(msg.imu_state.accelerometer), + "rpy": list(msg.imu_state.rpy), + "temperature": msg.imu_state.temperature, }, # Encode bytes as base64 for JSON compatibility - WIRELESS_REMOTE: base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"), - MODE_MACHINE: msg.mode_machine, + "wireless_remote": base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"), + "mode_machine": msg.mode_machine, } def dict_to_lowcmd(data: dict[str, Any]) -> hg_LowCmd: """Convert dictionary back to LowCmd SDK message.""" cmd = unitree_hg_msg_dds__LowCmd_() - cmd.mode_pr = data.get(MODE_PR, 0) - cmd.mode_machine = data.get(MODE_MACHINE, 0) + cmd.mode_pr = data.get("mode_pr", 0) + cmd.mode_machine = data.get("mode_machine", 0) - for i, motor_data in enumerate(data.get(MOTOR_CMD, [])): - cmd.motor_cmd[i].mode = motor_data.get(MOTOR_MODE, 0) - cmd.motor_cmd[i].q = motor_data.get(MOTOR_Q, 0.0) - cmd.motor_cmd[i].dq = motor_data.get(MOTOR_DQ, 0.0) - cmd.motor_cmd[i].kp = motor_data.get(MOTOR_KP, 0.0) - cmd.motor_cmd[i].kd = motor_data.get(MOTOR_KD, 0.0) - cmd.motor_cmd[i].tau = motor_data.get(MOTOR_TAU, 0.0) + for i, motor_data in enumerate(data.get("motor_cmd", [])): + cmd.motor_cmd[i].mode = motor_data.get("mode", 0) + cmd.motor_cmd[i].q = motor_data.get("q", 0.0) + cmd.motor_cmd[i].dq = motor_data.get("dq", 0.0) + cmd.motor_cmd[i].kp = motor_data.get("kp", 0.0) + cmd.motor_cmd[i].kd = motor_data.get("kd", 0.0) + cmd.motor_cmd[i].tau = motor_data.get("tau", 0.0) return cmd @@ -135,7 +112,7 @@ def state_forward_loop( if now - last_state_time >= state_period: # Convert to dict and serialize with JSON state_dict = lowstate_to_dict(msg) - payload = json.dumps({TOPIC: kTopicLowState, DATA: state_dict}).encode("utf-8") + payload = json.dumps({"topic": kTopicLowState, "data": state_dict}).encode("utf-8") # if no subscribers / tx buffer full, just drop with contextlib.suppress(zmq.Again): lowstate_sock.send(payload, zmq.NOBLOCK) @@ -152,8 +129,8 @@ def cmd_forward_loop( payload = lowcmd_sock.recv() msg_dict = json.loads(payload.decode("utf-8")) - topic = msg_dict.get(TOPIC, "") - cmd_data = msg_dict.get(DATA, {}) + topic = msg_dict.get("topic", "") + cmd_data = msg_dict.get("data", {}) # Reconstruct LowCmd object from dict cmd = dict_to_lowcmd(cmd_data) diff --git a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py index 876096c2e..9ba027e56 100644 --- a/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py +++ b/src/lerobot/robots/unitree_g1/unitree_sdk2_socket.py @@ -15,29 +15,6 @@ from typing import Any import zmq from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config -from lerobot.robots.unitree_g1.g1_utils import ( - DATA, - IMU_ACCELEROMETER, - IMU_GYROSCOPE, - IMU_QUATERNION, - IMU_RPY, - IMU_STATE, - IMU_TEMPERATURE, - MODE_MACHINE, - MODE_PR, - MOTOR_CMD, - MOTOR_DQ, - MOTOR_KD, - MOTOR_KP, - MOTOR_MODE, - MOTOR_Q, - MOTOR_STATE, - MOTOR_TAU, - MOTOR_TAU_EST, - MOTOR_TEMPERATURE, - TOPIC, - WIRELESS_REMOTE, -) _ctx: zmq.Context | None = None _lowcmd_sock: zmq.Socket | None = None @@ -63,29 +40,29 @@ class LowStateMsg: """Motor state data for a single joint.""" def __init__(self, data: dict[str, Any]) -> None: - self.q: float = data.get(MOTOR_Q, 0.0) - self.dq: float = data.get(MOTOR_DQ, 0.0) - self.tau_est: float = data.get(MOTOR_TAU_EST, 0.0) - self.temperature: float = data.get(MOTOR_TEMPERATURE, 0.0) + self.q: float = data.get("q", 0.0) + self.dq: float = data.get("dq", 0.0) + self.tau_est: float = data.get("tau_est", 0.0) + self.temperature: float = data.get("temperature", 0.0) class IMUState: """IMU sensor data.""" def __init__(self, data: dict[str, Any]) -> None: - self.quaternion: list[float] = data.get(IMU_QUATERNION, [1.0, 0.0, 0.0, 0.0]) - self.gyroscope: list[float] = data.get(IMU_GYROSCOPE, [0.0, 0.0, 0.0]) - self.accelerometer: list[float] = data.get(IMU_ACCELEROMETER, [0.0, 0.0, 0.0]) - self.rpy: list[float] = data.get(IMU_RPY, [0.0, 0.0, 0.0]) - self.temperature: float = data.get(IMU_TEMPERATURE, 0.0) + self.quaternion: list[float] = data.get("quaternion", [1.0, 0.0, 0.0, 0.0]) + self.gyroscope: list[float] = data.get("gyroscope", [0.0, 0.0, 0.0]) + self.accelerometer: list[float] = data.get("accelerometer", [0.0, 0.0, 0.0]) + self.rpy: list[float] = data.get("rpy", [0.0, 0.0, 0.0]) + self.temperature: float = data.get("temperature", 0.0) def __init__(self, data: dict[str, Any]) -> None: """Initialize from deserialized JSON data.""" - self.motor_state = [self.MotorState(m) for m in data.get(MOTOR_STATE, [])] - self.imu_state = self.IMUState(data.get(IMU_STATE, {})) + self.motor_state = [self.MotorState(m) for m in data.get("motor_state", [])] + self.imu_state = self.IMUState(data.get("imu_state", {})) # Decode base64-encoded wireless_remote bytes - wireless_b64 = data.get(WIRELESS_REMOTE, "") + wireless_b64 = data.get("wireless_remote", "") self.wireless_remote: bytes = base64.b64decode(wireless_b64) if wireless_b64 else b"" - self.mode_machine: int = data.get(MODE_MACHINE, 0) + self.mode_machine: int = data.get("mode_machine", 0) def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]: @@ -95,21 +72,21 @@ def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]: for i in range(len(msg.motor_cmd)): motor_cmds.append( { - MOTOR_MODE: msg.motor_cmd[i].mode, - MOTOR_Q: msg.motor_cmd[i].q, - MOTOR_DQ: msg.motor_cmd[i].dq, - MOTOR_KP: msg.motor_cmd[i].kp, - MOTOR_KD: msg.motor_cmd[i].kd, - MOTOR_TAU: msg.motor_cmd[i].tau, + "mode": msg.motor_cmd[i].mode, + "q": msg.motor_cmd[i].q, + "dq": msg.motor_cmd[i].dq, + "kp": msg.motor_cmd[i].kp, + "kd": msg.motor_cmd[i].kd, + "tau": msg.motor_cmd[i].tau, } ) return { - TOPIC: topic, - DATA: { - MODE_PR: msg.mode_pr, - MODE_MACHINE: msg.mode_machine, - MOTOR_CMD: motor_cmds, + "topic": topic, + "data": { + "mode_pr": msg.mode_pr, + "mode_machine": msg.mode_machine, + "motor_cmd": motor_cmds, }, } @@ -182,4 +159,4 @@ class ChannelSubscriber: payload = _lowstate_sock.recv() msg_dict = json.loads(payload.decode("utf-8")) - return LowStateMsg(msg_dict.get(DATA, {})) + return LowStateMsg(msg_dict.get("data", {}))