2025-12-06 22:47:57 +08:00
|
|
|
|
import gradio as gr
|
|
|
|
|
|
from balance import BalanceController
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import threading
|
|
|
|
|
|
import time
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# 全局配置 & 日志系统(写入磁盘文件 ui.log)
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
LOG_PATH = "ui.log"
|
|
|
|
|
|
logging.basicConfig(
|
|
|
|
|
|
filename=LOG_PATH,
|
|
|
|
|
|
level=logging.INFO,
|
|
|
|
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
|
|
|
|
)
|
|
|
|
|
|
logger = logging.getLogger("ui")
|
|
|
|
|
|
|
|
|
|
|
|
def log(msg: str) -> None:
|
|
|
|
|
|
"""统一写入日志的函数,同时打印到控制台(方便调试)。"""
|
|
|
|
|
|
logger.info(msg)
|
|
|
|
|
|
for h in logger.handlers:
|
|
|
|
|
|
h.flush()
|
|
|
|
|
|
print(msg) # optional, 可删除
|
|
|
|
|
|
|
|
|
|
|
|
controller: BalanceController | None = None # 单例
|
2025-12-09 22:55:35 +08:00
|
|
|
|
motors_enabled = False # 电机使能状态
|
2025-12-11 12:12:12 +08:00
|
|
|
|
port_opened: bool = False # 是否已打开串口
|
2025-12-06 22:47:57 +08:00
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# 安全创建 BalanceController(带重试)
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
def create_controller(retries: int = 3, delay: float = 1.0) -> BalanceController | None:
|
|
|
|
|
|
"""尝试实例化 BalanceController,失败则记录日志并返回 None。"""
|
|
|
|
|
|
for attempt in range(1, retries + 1):
|
|
|
|
|
|
try:
|
|
|
|
|
|
log(f"尝试创建 BalanceController(第 {attempt} 次)")
|
|
|
|
|
|
return BalanceController()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
log(f"创建 BalanceController 失败: {e}")
|
|
|
|
|
|
if attempt < retries:
|
|
|
|
|
|
time.sleep(delay)
|
|
|
|
|
|
log("全部重试结束,仍未能创建 BalanceController")
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
2025-12-11 12:12:12 +08:00
|
|
|
|
# ---------------------------------------印度洋15
|
|
|
|
|
|
# ----------
|
2025-12-06 22:47:57 +08:00
|
|
|
|
# UI 操作函数(统一异常捕获、日志记录、返回状态文字)
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
def open_port() -> tuple:
|
|
|
|
|
|
"""创建 BalanceController 实例并打开串口(不自动使能)。"""
|
2025-12-11 12:12:12 +08:00
|
|
|
|
global controller, port_opened
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if controller is None:
|
|
|
|
|
|
controller = create_controller()
|
|
|
|
|
|
if controller is None:
|
|
|
|
|
|
msg = "打开串口失败:无法创建 BalanceController"
|
|
|
|
|
|
log(msg)
|
2025-12-11 12:12:12 +08:00
|
|
|
|
port_opened = False
|
2025-12-06 22:47:57 +08:00
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
msg = "串口已打开"
|
|
|
|
|
|
log(msg)
|
2025-12-11 12:12:12 +08:00
|
|
|
|
port_opened = True
|
2025-12-06 22:47:57 +08:00
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
msg = "串口已打开(已存在实例)"
|
|
|
|
|
|
log(msg)
|
2025-12-11 12:12:12 +08:00
|
|
|
|
port_opened = True
|
2025-12-06 22:47:57 +08:00
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
|
|
|
|
|
def enable_all() -> tuple:
|
|
|
|
|
|
"""使能所有电机(腿部+轮子)。"""
|
|
|
|
|
|
global motors_enabled
|
2025-12-11 12:12:12 +08:00
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if controller is None:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
try:
|
|
|
|
|
|
controller.enable_all()
|
|
|
|
|
|
motors_enabled = True
|
|
|
|
|
|
msg = "电机已全部使能"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
msg = f"使能电机异常: {e}"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
|
|
|
|
|
def disable_all() -> tuple:
|
|
|
|
|
|
"""失能所有电机。"""
|
|
|
|
|
|
global motors_enabled
|
2025-12-11 12:12:12 +08:00
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if controller is None:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
try:
|
|
|
|
|
|
controller.disable_all()
|
|
|
|
|
|
motors_enabled = False
|
|
|
|
|
|
msg = "所有电机已失能"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
msg = f"失能电机异常: {e}"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
|
|
|
|
|
def set_position(pos1, pos2, pos3, pos4, vel) -> tuple:
|
|
|
|
|
|
"""设置四条腿的位置与速度比例。"""
|
2025-12-11 12:12:12 +08:00
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if controller is None:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
try:
|
|
|
|
|
|
controller.control_legs_pos(pos1, pos2, pos3, pos4, vel)
|
|
|
|
|
|
msg = f"已设置位置: {pos1}, {pos2}, {pos3}, {pos4}(速度比例 {vel})"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
msg = f"设置位置异常: {e}"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
|
|
|
|
|
def get_torque() -> tuple:
|
|
|
|
|
|
"""读取四条腿的扭矩,返回字符串列表。"""
|
2025-12-11 12:12:12 +08:00
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "未打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (["未打开串口"] * 4, msg)
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if controller is None:
|
|
|
|
|
|
msg = "未打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (["未打开串口"] * 4, msg)
|
|
|
|
|
|
try:
|
|
|
|
|
|
torques = controller.get_legs_torque()
|
|
|
|
|
|
msg = f"读取扭矩: {torques}"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return ([f"{t:.2f}" if t is not None else "N/A" for t in torques], msg)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
msg = f"读取扭矩异常: {e}"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (["错误"] * 4, msg)
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# 后台平衡循环(守护线程)
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
def _balance_thread(ctrl: BalanceController) -> None:
|
|
|
|
|
|
"""后台线程入口,运行平衡循环并确保资源安全释放。"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
ctrl.enable_all()
|
|
|
|
|
|
log("平衡循环启动")
|
|
|
|
|
|
ctrl.run_balance_loop()
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
log(f"平衡循环异常: {e}")
|
|
|
|
|
|
finally:
|
|
|
|
|
|
ctrl.shutdown()
|
|
|
|
|
|
log("平衡循环已结束,资源已清理")
|
|
|
|
|
|
|
|
|
|
|
|
def start_balance_thread() -> None:
|
|
|
|
|
|
"""在守护线程中启动平衡循环。"""
|
|
|
|
|
|
global controller
|
|
|
|
|
|
if controller is None:
|
|
|
|
|
|
controller = create_controller()
|
|
|
|
|
|
if controller is None:
|
|
|
|
|
|
log("启动平衡控制失败:无法创建 BalanceController")
|
|
|
|
|
|
return
|
|
|
|
|
|
thread = threading.Thread(target=_balance_thread, args=(controller,), daemon=True)
|
|
|
|
|
|
thread.start()
|
|
|
|
|
|
log("平衡控制线程已启动")
|
|
|
|
|
|
|
2025-12-21 11:23:46 +08:00
|
|
|
|
def start_spd_thread(nomspd,offspd) -> None:
|
|
|
|
|
|
"""在守护线程中启动平衡循环。"""
|
|
|
|
|
|
if controller is None:
|
|
|
|
|
|
log("启动speed控制失败:无法创建 BalanceController")
|
|
|
|
|
|
return
|
|
|
|
|
|
controller.legs.control_wheels_vel(nomspd,offspd)
|
|
|
|
|
|
log("speed线程已启动")
|
|
|
|
|
|
|
2025-12-06 22:47:57 +08:00
|
|
|
|
def start_balance() -> tuple:
|
|
|
|
|
|
"""检查电机是否已使能后启动平衡控制。"""
|
2025-12-11 12:12:12 +08:00
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
2025-12-06 22:47:57 +08:00
|
|
|
|
if not motors_enabled:
|
|
|
|
|
|
msg = "启动平衡控制失败:电机未使能"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
start_balance_thread()
|
|
|
|
|
|
msg = "平衡控制已启动"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# 日志刷新(用于 UI 按钮)
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
def refresh_log() -> str:
|
|
|
|
|
|
"""读取最新的日志文件内容返回给 UI。"""
|
|
|
|
|
|
if not os.path.exists(LOG_PATH):
|
|
|
|
|
|
return ""
|
|
|
|
|
|
with open(LOG_PATH, "r", encoding="utf-8") as f:
|
|
|
|
|
|
return f.read()
|
|
|
|
|
|
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# 自动打开串口并初始化 UI
|
|
|
|
|
|
# -------------------------------------------------
|
2025-12-11 12:12:12 +08:00
|
|
|
|
init_status = "未打开串口"
|
2025-12-06 22:47:57 +08:00
|
|
|
|
|
2025-12-21 11:23:46 +08:00
|
|
|
|
def control_speed(spd,off_spd):
|
|
|
|
|
|
"""检查电机是否已使能后启动平衡控制。"""
|
|
|
|
|
|
if not port_opened:
|
|
|
|
|
|
msg = "请先打开串口"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
if not motors_enabled:
|
|
|
|
|
|
msg = "启动平衡控制失败:电机未使能"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
start_spd_thread(spd,off_spd)
|
|
|
|
|
|
msg = "平衡控制已启动"
|
|
|
|
|
|
log(msg)
|
|
|
|
|
|
return (msg, msg)
|
|
|
|
|
|
|
2025-12-06 22:47:57 +08:00
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
# Gradio UI
|
|
|
|
|
|
# -------------------------------------------------
|
|
|
|
|
|
with gr.Blocks() as demo:
|
|
|
|
|
|
# 日志显示区
|
|
|
|
|
|
log_box = gr.Textbox(label="运行日志", lines=15, interactive=False)
|
|
|
|
|
|
refresh_btn = gr.Button("刷新日志")
|
|
|
|
|
|
refresh_btn.click(fn=refresh_log, inputs=None, outputs=log_box)
|
|
|
|
|
|
|
|
|
|
|
|
gr.Markdown("# 🤖 四足机器人控制面板")
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Row():
|
|
|
|
|
|
# 左侧:电机控制
|
|
|
|
|
|
with gr.Column():
|
|
|
|
|
|
gr.Markdown("## 电机控制")
|
|
|
|
|
|
# 已移除 “打开串口” 按钮,串口在启动时已自动打开
|
2025-12-21 11:23:46 +08:00
|
|
|
|
open_btn = gr.Button("🔌 打开串口")
|
2025-12-06 22:47:57 +08:00
|
|
|
|
enable_btn = gr.Button("✅ 使能全部")
|
|
|
|
|
|
disable_btn = gr.Button("❌ 失能全部")
|
|
|
|
|
|
start_btn = gr.Button("▶️ 启动平衡控制")
|
|
|
|
|
|
status_box = gr.Textbox(label="状态", value=init_status, interactive=False)
|
|
|
|
|
|
|
2025-12-11 12:12:12 +08:00
|
|
|
|
open_btn.click(fn=open_port, inputs=None, outputs=[status_box, log_box])
|
|
|
|
|
|
enable_btn.click(fn=enable_all, inputs=None, outputs=[status_box, log_box])
|
|
|
|
|
|
disable_btn.click(fn=disable_all, inputs=None, outputs=[status_box, log_box])
|
|
|
|
|
|
start_btn.click(fn=start_balance, inputs=None, outputs=[status_box, log_box])
|
2025-12-21 11:23:46 +08:00
|
|
|
|
|
2025-12-06 22:47:57 +08:00
|
|
|
|
with gr.Column():
|
2025-12-21 11:23:46 +08:00
|
|
|
|
gr.Markdown("## speed控制")
|
|
|
|
|
|
normal_speed = gr.Slider(label="spd",minimum=-1,maximum=1,value=0.0,step=0.01)
|
|
|
|
|
|
off_speed = gr.Slider(label="off",minimum=-1,maximum=1,value=0.0,step=0.01)
|
|
|
|
|
|
normal_speed.change(fn= control_speed,inputs=[normal_speed,off_speed], outputs=[status_box, log_box])
|
|
|
|
|
|
off_speed.change(fn= control_speed,inputs=[normal_speed,off_speed], outputs=[status_box, log_box])
|
2025-12-06 22:47:57 +08:00
|
|
|
|
# 右侧:扭矩读取
|
|
|
|
|
|
with gr.Column():
|
|
|
|
|
|
gr.Markdown("## 扭矩读取")
|
|
|
|
|
|
torque_output = gr.Textbox(label="腿部扭矩 (N/m)", interactive=False)
|
2025-12-21 11:23:46 +08:00
|
|
|
|
read_btn = gr.Button("<EFBFBD> 读取扭矩")
|
2025-12-06 22:47:57 +08:00
|
|
|
|
read_btn.click(fn=get_torque, inputs=None, outputs=[torque_output, log_box])
|
|
|
|
|
|
|
|
|
|
|
|
demo.launch(server_name="0.0.0.0", server_port=7860, debug=True)
|