remove globals use

This commit is contained in:
Michel Aractingi
2025-11-28 13:26:34 +01:00
parent 942eec9332
commit b697e767ac
3 changed files with 51 additions and 135 deletions

View File

@@ -2,44 +2,6 @@ from enum import IntEnum
# ruff: noqa: N801, N815
# =============================================================================
# Serialization Keys for ZMQ Communication Protocol
# =============================================================================
# These constants define the JSON keys used for robot state/command serialization.
# They are shared between robot_server.py (robot-side) and unitree_sdk2_socket.py (client-side).
# Top-level message keys
TOPIC = "topic"
DATA = "data"
# Motor state keys
MOTOR_STATE = "motor_state"
MOTOR_Q = "q" # position
MOTOR_DQ = "dq" # velocity
MOTOR_TAU_EST = "tau_est" # estimated torque
MOTOR_TEMPERATURE = "temperature"
# Motor command keys
MOTOR_CMD = "motor_cmd"
MOTOR_MODE = "mode"
MOTOR_KP = "kp"
MOTOR_KD = "kd"
MOTOR_TAU = "tau"
# IMU state keys
IMU_STATE = "imu_state"
IMU_QUATERNION = "quaternion"
IMU_GYROSCOPE = "gyroscope"
IMU_ACCELEROMETER = "accelerometer"
IMU_RPY = "rpy"
IMU_TEMPERATURE = "temperature"
# Other state keys
WIRELESS_REMOTE = "wireless_remote"
MODE_MACHINE = "mode_machine"
MODE_PR = "mode_pr"
# Number of motors
NUM_MOTORS = 35

View File

@@ -32,30 +32,6 @@ import time
from typing import Any
import zmq
from g1_utils import (
DATA,
IMU_ACCELEROMETER,
IMU_GYROSCOPE,
IMU_QUATERNION,
IMU_RPY,
IMU_STATE,
IMU_TEMPERATURE,
MODE_MACHINE,
MODE_PR,
MOTOR_CMD,
MOTOR_DQ,
MOTOR_KD,
MOTOR_KP,
MOTOR_MODE,
MOTOR_Q,
MOTOR_STATE,
MOTOR_TAU,
MOTOR_TAU_EST,
MOTOR_TEMPERATURE,
NUM_MOTORS,
TOPIC,
WIRELESS_REMOTE,
)
from unitree_sdk2py.comm.motion_switcher.motion_switcher_client import MotionSwitcherClient
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelPublisher, ChannelSubscriber
from unitree_sdk2py.idl.default import unitree_hg_msg_dds__LowCmd_
@@ -69,6 +45,7 @@ kTopicLowState = "rt/lowstate" # observation from robot
LOWCMD_PORT = 6000
LOWSTATE_PORT = 6001
NUM_MOTORS = 35
def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]:
@@ -77,41 +54,41 @@ def lowstate_to_dict(msg: hg_LowState) -> dict[str, Any]:
for i in range(NUM_MOTORS):
motor_states.append(
{
MOTOR_Q: msg.motor_state[i].q,
MOTOR_DQ: msg.motor_state[i].dq,
MOTOR_TAU_EST: msg.motor_state[i].tau_est,
MOTOR_TEMPERATURE: msg.motor_state[i].temperature,
"q": msg.motor_state[i].q,
"dq": msg.motor_state[i].dq,
"tau_est": msg.motor_state[i].tau_est,
"temperature": msg.motor_state[i].temperature,
}
)
return {
MOTOR_STATE: motor_states,
IMU_STATE: {
IMU_QUATERNION: list(msg.imu_state.quaternion),
IMU_GYROSCOPE: list(msg.imu_state.gyroscope),
IMU_ACCELEROMETER: list(msg.imu_state.accelerometer),
IMU_RPY: list(msg.imu_state.rpy),
IMU_TEMPERATURE: msg.imu_state.temperature,
"motor_state": motor_states,
"imu_state": {
"quaternion": list(msg.imu_state.quaternion),
"gyroscope": list(msg.imu_state.gyroscope),
"accelerometer": list(msg.imu_state.accelerometer),
"rpy": list(msg.imu_state.rpy),
"temperature": msg.imu_state.temperature,
},
# Encode bytes as base64 for JSON compatibility
WIRELESS_REMOTE: base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"),
MODE_MACHINE: msg.mode_machine,
"wireless_remote": base64.b64encode(bytes(msg.wireless_remote)).decode("ascii"),
"mode_machine": msg.mode_machine,
}
def dict_to_lowcmd(data: dict[str, Any]) -> hg_LowCmd:
"""Convert dictionary back to LowCmd SDK message."""
cmd = unitree_hg_msg_dds__LowCmd_()
cmd.mode_pr = data.get(MODE_PR, 0)
cmd.mode_machine = data.get(MODE_MACHINE, 0)
cmd.mode_pr = data.get("mode_pr", 0)
cmd.mode_machine = data.get("mode_machine", 0)
for i, motor_data in enumerate(data.get(MOTOR_CMD, [])):
cmd.motor_cmd[i].mode = motor_data.get(MOTOR_MODE, 0)
cmd.motor_cmd[i].q = motor_data.get(MOTOR_Q, 0.0)
cmd.motor_cmd[i].dq = motor_data.get(MOTOR_DQ, 0.0)
cmd.motor_cmd[i].kp = motor_data.get(MOTOR_KP, 0.0)
cmd.motor_cmd[i].kd = motor_data.get(MOTOR_KD, 0.0)
cmd.motor_cmd[i].tau = motor_data.get(MOTOR_TAU, 0.0)
for i, motor_data in enumerate(data.get("motor_cmd", [])):
cmd.motor_cmd[i].mode = motor_data.get("mode", 0)
cmd.motor_cmd[i].q = motor_data.get("q", 0.0)
cmd.motor_cmd[i].dq = motor_data.get("dq", 0.0)
cmd.motor_cmd[i].kp = motor_data.get("kp", 0.0)
cmd.motor_cmd[i].kd = motor_data.get("kd", 0.0)
cmd.motor_cmd[i].tau = motor_data.get("tau", 0.0)
return cmd
@@ -135,7 +112,7 @@ def state_forward_loop(
if now - last_state_time >= state_period:
# Convert to dict and serialize with JSON
state_dict = lowstate_to_dict(msg)
payload = json.dumps({TOPIC: kTopicLowState, DATA: state_dict}).encode("utf-8")
payload = json.dumps({"topic": kTopicLowState, "data": state_dict}).encode("utf-8")
# if no subscribers / tx buffer full, just drop
with contextlib.suppress(zmq.Again):
lowstate_sock.send(payload, zmq.NOBLOCK)
@@ -152,8 +129,8 @@ def cmd_forward_loop(
payload = lowcmd_sock.recv()
msg_dict = json.loads(payload.decode("utf-8"))
topic = msg_dict.get(TOPIC, "")
cmd_data = msg_dict.get(DATA, {})
topic = msg_dict.get("topic", "")
cmd_data = msg_dict.get("data", {})
# Reconstruct LowCmd object from dict
cmd = dict_to_lowcmd(cmd_data)

View File

@@ -15,29 +15,6 @@ from typing import Any
import zmq
from lerobot.robots.unitree_g1.config_unitree_g1 import UnitreeG1Config
from lerobot.robots.unitree_g1.g1_utils import (
DATA,
IMU_ACCELEROMETER,
IMU_GYROSCOPE,
IMU_QUATERNION,
IMU_RPY,
IMU_STATE,
IMU_TEMPERATURE,
MODE_MACHINE,
MODE_PR,
MOTOR_CMD,
MOTOR_DQ,
MOTOR_KD,
MOTOR_KP,
MOTOR_MODE,
MOTOR_Q,
MOTOR_STATE,
MOTOR_TAU,
MOTOR_TAU_EST,
MOTOR_TEMPERATURE,
TOPIC,
WIRELESS_REMOTE,
)
_ctx: zmq.Context | None = None
_lowcmd_sock: zmq.Socket | None = None
@@ -63,29 +40,29 @@ class LowStateMsg:
"""Motor state data for a single joint."""
def __init__(self, data: dict[str, Any]) -> None:
self.q: float = data.get(MOTOR_Q, 0.0)
self.dq: float = data.get(MOTOR_DQ, 0.0)
self.tau_est: float = data.get(MOTOR_TAU_EST, 0.0)
self.temperature: float = data.get(MOTOR_TEMPERATURE, 0.0)
self.q: float = data.get("q", 0.0)
self.dq: float = data.get("dq", 0.0)
self.tau_est: float = data.get("tau_est", 0.0)
self.temperature: float = data.get("temperature", 0.0)
class IMUState:
"""IMU sensor data."""
def __init__(self, data: dict[str, Any]) -> None:
self.quaternion: list[float] = data.get(IMU_QUATERNION, [1.0, 0.0, 0.0, 0.0])
self.gyroscope: list[float] = data.get(IMU_GYROSCOPE, [0.0, 0.0, 0.0])
self.accelerometer: list[float] = data.get(IMU_ACCELEROMETER, [0.0, 0.0, 0.0])
self.rpy: list[float] = data.get(IMU_RPY, [0.0, 0.0, 0.0])
self.temperature: float = data.get(IMU_TEMPERATURE, 0.0)
self.quaternion: list[float] = data.get("quaternion", [1.0, 0.0, 0.0, 0.0])
self.gyroscope: list[float] = data.get("gyroscope", [0.0, 0.0, 0.0])
self.accelerometer: list[float] = data.get("accelerometer", [0.0, 0.0, 0.0])
self.rpy: list[float] = data.get("rpy", [0.0, 0.0, 0.0])
self.temperature: float = data.get("temperature", 0.0)
def __init__(self, data: dict[str, Any]) -> None:
"""Initialize from deserialized JSON data."""
self.motor_state = [self.MotorState(m) for m in data.get(MOTOR_STATE, [])]
self.imu_state = self.IMUState(data.get(IMU_STATE, {}))
self.motor_state = [self.MotorState(m) for m in data.get("motor_state", [])]
self.imu_state = self.IMUState(data.get("imu_state", {}))
# Decode base64-encoded wireless_remote bytes
wireless_b64 = data.get(WIRELESS_REMOTE, "")
wireless_b64 = data.get("wireless_remote", "")
self.wireless_remote: bytes = base64.b64decode(wireless_b64) if wireless_b64 else b""
self.mode_machine: int = data.get(MODE_MACHINE, 0)
self.mode_machine: int = data.get("mode_machine", 0)
def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]:
@@ -95,21 +72,21 @@ def lowcmd_to_dict(topic: str, msg: Any) -> dict[str, Any]:
for i in range(len(msg.motor_cmd)):
motor_cmds.append(
{
MOTOR_MODE: msg.motor_cmd[i].mode,
MOTOR_Q: msg.motor_cmd[i].q,
MOTOR_DQ: msg.motor_cmd[i].dq,
MOTOR_KP: msg.motor_cmd[i].kp,
MOTOR_KD: msg.motor_cmd[i].kd,
MOTOR_TAU: msg.motor_cmd[i].tau,
"mode": msg.motor_cmd[i].mode,
"q": msg.motor_cmd[i].q,
"dq": msg.motor_cmd[i].dq,
"kp": msg.motor_cmd[i].kp,
"kd": msg.motor_cmd[i].kd,
"tau": msg.motor_cmd[i].tau,
}
)
return {
TOPIC: topic,
DATA: {
MODE_PR: msg.mode_pr,
MODE_MACHINE: msg.mode_machine,
MOTOR_CMD: motor_cmds,
"topic": topic,
"data": {
"mode_pr": msg.mode_pr,
"mode_machine": msg.mode_machine,
"motor_cmd": motor_cmds,
},
}
@@ -182,4 +159,4 @@ class ChannelSubscriber:
payload = _lowstate_sock.recv()
msg_dict = json.loads(payload.decode("utf-8"))
return LowStateMsg(msg_dict.get(DATA, {}))
return LowStateMsg(msg_dict.get("data", {}))