Files
lerobot-clone/src/lerobot/utils/pedal.py
Steven Palma ca87ccd941 feat(rollout): decouple policy deployment from data recording with new lerobot-rollout CLI (#3413)
* feat(scripts): lerobot-rollout

* fix(rollout) require dataset in dagger + use duration too

* fix(docs): dagger num_episodes

* test(rollout): fix expectations

* fix(rollout): features check

* fix(rollout): device and task propagation + feature pos + warn fps + move rename_map config

* docs(rollout): edit rename_map instructions

* chore(rollout): multiple minor improvements

* chore(rollout): address coments + minor improvements

* fix(rollout): enable default

* fix(tests): default value RTCConfig

* fix(rollout): robot_observation_processor and notify_observation at policy frequency instead of interpolator rate

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): prevent relativeactions with sync inference engine

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): rtc reanchor to non normalized state

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>

* fix(rollout): fixing the episode length to use hwc (#3469)

also reducing default length to 5 minutes

* feat(rollout): go back to initial position is now a config

* fix(rollout): properly propagating video_files_size_in_mb to lerobot_dataset (#3470)

* chore(rollout): note about dagger correction stage

* chore(docs): update comments and docstring

* fix(test): move rtc relative out of rollout module

* fix(rollout): address the review comments

---------

Co-authored-by: Pepijn <138571049+pkooij@users.noreply.github.com>
Co-authored-by: Maxime Ellerbach <maxime.ellerbach@huggingface.co>
2026-04-28 00:57:35 +02:00

84 lines
2.9 KiB
Python

# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic foot pedal listener using evdev.
Callers supply a callback receiving the pressed key code (e.g. ``"KEY_A"``)
and an optional device path. The listener runs in a daemon thread and
silently no-ops when :mod:`evdev` is not installed or the device is
unavailable. Strategy-specific key mapping logic lives in the caller.
"""
from __future__ import annotations
import logging
import threading
from collections.abc import Callable
logger = logging.getLogger(__name__)
DEFAULT_PEDAL_DEVICE = "/dev/input/by-id/usb-PCsensor_FootSwitch-event-kbd"
def start_pedal_listener(
on_press: Callable[[str], None],
device_path: str = DEFAULT_PEDAL_DEVICE,
) -> threading.Thread | None:
"""Spawn a daemon thread that forwards pedal key-press codes to ``on_press``.
Parameters
----------
on_press:
Callback invoked with the pressed key code string (e.g. ``"KEY_A"``)
on each pedal press event. The callback runs in the listener thread
and must be thread-safe.
device_path:
Linux input device path (e.g. ``/dev/input/by-id/...``).
Returns
-------
The started daemon :class:`threading.Thread`, or ``None`` when
:mod:`evdev` is not installed (optional dependency; silent no-op).
"""
try:
from evdev import InputDevice, categorize, ecodes
except ImportError:
return None
def pedal_reader() -> None:
try:
dev = InputDevice(device_path)
logger.info("Pedal connected: %s", dev.name)
for ev in dev.read_loop():
if ev.type != ecodes.EV_KEY:
continue
key = categorize(ev)
code = key.keycode
if isinstance(code, (list, tuple)):
code = code[0]
if key.keystate != 1: # only key-down events
continue
try:
on_press(code)
except Exception as cb_err: # pragma: no cover - defensive
logger.warning("Pedal callback error: %s", cb_err)
except (FileNotFoundError, PermissionError):
pass
except Exception as e:
logger.warning("Pedal error: %s", e)
thread = threading.Thread(target=pedal_reader, daemon=True, name="PedalListener")
thread.start()
return thread