mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-30 10:21:24 +00:00
Compare commits
16 Commits
feature/ge
...
user/miche
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73fab32c26 | ||
|
|
f76a108b08 | ||
|
|
fe7c368630 | ||
|
|
f65e74af9c | ||
|
|
62f716dbc9 | ||
|
|
e8b8d57191 | ||
|
|
53ace28c42 | ||
|
|
f58796a112 | ||
|
|
e4db65a127 | ||
|
|
0053defa2e | ||
|
|
fd5d8b3d5f | ||
|
|
5bf82f8229 | ||
|
|
5ca3920611 | ||
|
|
8bde9d0ab7 | ||
|
|
abcbc16126 | ||
|
|
e4fd30a8d4 |
@@ -4,7 +4,13 @@ In this tutorial you will go through the full Human-in-the-Loop Sample-Efficient
|
||||
|
||||
HIL-SERL is a sample-efficient reinforcement learning algorithm that combines human demonstrations with online learning and human interventions. The approach starts from a small set of human demonstrations, uses them to train a reward classifier, and then employs an actor-learner architecture where humans can intervene during policy execution to guide exploration and correct unsafe behaviors. In this tutorial, you'll use a gamepad to provide interventions and control the robot during the learning process.
|
||||
|
||||
It combines three key ingredients: 1. **Offline demonstrations & reward classifier:** a handful of human-teleop episodes plus a vision-based success detector give the policy a shaped starting point. 2. **On-robot actor / learner loop with human interventions:** a distributed Soft Actor Critic (SAC) learner updates the policy while an actor explores on the physical robot; the human can jump in at any time to correct dangerous or unproductive behaviour. 3. **Safety & efficiency tools:** joint/end-effector (EE) bounds, crop region of interest (ROI) preprocessing and WandB monitoring keep the data useful and the hardware safe.
|
||||
It combines three key ingredients:
|
||||
|
||||
1. **Offline demonstrations & reward classifier:** a handful of human-teleop episodes plus a vision-based success detector give the policy a shaped starting point.
|
||||
|
||||
2. **On-robot actor / learner loop with human interventions:** a distributed Soft Actor Critic (SAC) learner updates the policy while an actor explores on the physical robot; the human can jump in at any time to correct dangerous or unproductive behaviour.
|
||||
|
||||
3. **Safety & efficiency tools:** joint/end-effector (EE) bounds, crop region of interest (ROI) preprocessing and WandB monitoring keep the data useful and the hardware safe.
|
||||
|
||||
Together these elements let HIL-SERL reach near-perfect task success and faster cycle times than imitation-only baselines.
|
||||
|
||||
@@ -56,30 +62,243 @@ pip install -e ".[hilserl]"
|
||||
|
||||
### Understanding Configuration
|
||||
|
||||
The training process begins with proper configuration for the HILSerl environment. The configuration class of interest is `HILSerlRobotEnvConfig` in `lerobot/envs/configs.py`. Which is defined as:
|
||||
The training process begins with proper configuration for the HILSerl environment. The main configuration class is `GymManipulatorConfig` in `lerobot/scripts/rl/gym_manipulator.py`, which contains nested `HILSerlRobotEnvConfig` and `DatasetConfig`. The configuration is organized into focused, nested sub-configs:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
class GymManipulatorConfig:
|
||||
env: HILSerlRobotEnvConfig # Environment configuration (nested)
|
||||
dataset: DatasetConfig # Dataset recording/replay configuration (nested)
|
||||
mode: str | None = None # "record", "replay", or None (for training)
|
||||
device: str = "cpu" # Compute device
|
||||
|
||||
class HILSerlRobotEnvConfig(EnvConfig):
|
||||
robot: RobotConfig | None = None # Main robot agent (defined in `lerobot/robots`)
|
||||
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm, (defined in `lerobot/teleoperators`)
|
||||
wrapper: EnvTransformConfig | None = None # Environment wrapper settings; check `lerobot/scripts/server/gym_manipulator.py`
|
||||
fps: int = 10 # Control frequency
|
||||
teleop: TeleoperatorConfig | None = None # Teleoperator agent, e.g., gamepad or leader arm
|
||||
processor: HILSerlProcessorConfig # Processing pipeline configuration (nested)
|
||||
name: str = "real_robot" # Environment name
|
||||
mode: str = None # "record", "replay", or None (for training)
|
||||
repo_id: str | None = None # LeRobot dataset repository ID
|
||||
dataset_root: str | None = None # Local dataset root (optional)
|
||||
task: str = "" # Task identifier
|
||||
num_episodes: int = 10 # Number of episodes for recording
|
||||
episode: int = 0 # episode index for replay
|
||||
device: str = "cuda" # Compute device
|
||||
push_to_hub: bool = True # Whether to push the recorded datasets to Hub
|
||||
pretrained_policy_name_or_path: str | None = None # For policy loading
|
||||
reward_classifier_pretrained_path: str | None = None # For reward model
|
||||
number_of_steps_after_success: int = 0 # For reward classifier, collect more positive examples after a success to train a classifier
|
||||
task: str | None = None # Task identifier
|
||||
fps: int = 10 # Control frequency
|
||||
|
||||
# Nested processor configuration
|
||||
class HILSerlProcessorConfig:
|
||||
control_mode: str = "gamepad" # Control mode
|
||||
observation: ObservationConfig | None = None # Observation processing settings
|
||||
image_preprocessing: ImagePreprocessingConfig | None = None # Image crop/resize settings
|
||||
gripper: GripperConfig | None = None # Gripper control and penalty settings
|
||||
reset: ResetConfig | None = None # Environment reset and timing settings
|
||||
inverse_kinematics: InverseKinematicsConfig | None = None # IK processing settings
|
||||
reward_classifier: RewardClassifierConfig | None = None # Reward classifier settings
|
||||
max_gripper_pos: float | None = 100.0 # Maximum gripper position
|
||||
|
||||
# Sub-configuration classes
|
||||
class ObservationConfig:
|
||||
add_joint_velocity_to_observation: bool = False # Add joint velocities to state
|
||||
add_current_to_observation: bool = False # Add motor currents to state
|
||||
add_ee_pose_to_observation: bool = False # Add end-effector pose to state
|
||||
display_cameras: bool = False # Display camera feeds during execution
|
||||
|
||||
class ImagePreprocessingConfig:
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None # Image cropping parameters
|
||||
resize_size: tuple[int, int] | None = None # Target image size
|
||||
|
||||
class GripperConfig:
|
||||
use_gripper: bool = True # Enable gripper control
|
||||
gripper_penalty: float = 0.0 # Penalty for inappropriate gripper usage
|
||||
gripper_penalty_in_reward: bool = False # Include gripper penalty in reward
|
||||
|
||||
class ResetConfig:
|
||||
fixed_reset_joint_positions: Any | None = None # Joint positions for reset
|
||||
reset_time_s: float = 5.0 # Time to wait during reset
|
||||
control_time_s: float = 20.0 # Maximum episode duration
|
||||
terminate_on_success: bool = True # Whether to terminate episodes on success detection
|
||||
|
||||
class InverseKinematicsConfig:
|
||||
urdf_path: str | None = None # Path to robot URDF file
|
||||
target_frame_name: str | None = None # End-effector frame name
|
||||
end_effector_bounds: dict[str, list[float]] | None = None # EE workspace bounds
|
||||
end_effector_step_sizes: dict[str, float] | None = None # EE step sizes per axis
|
||||
|
||||
class RewardClassifierConfig:
|
||||
pretrained_path: str | None = None # Path to pretrained reward classifier
|
||||
success_threshold: float = 0.5 # Success detection threshold
|
||||
success_reward: float = 1.0 # Reward value for successful episodes
|
||||
|
||||
# Dataset configuration
|
||||
class DatasetConfig:
|
||||
repo_id: str # LeRobot dataset repository ID
|
||||
task: str # Task identifier
|
||||
root: str | None = None # Local dataset root directory
|
||||
num_episodes_to_record: int = 5 # Number of episodes for recording
|
||||
replay_episode: int | None = None # Episode index for replay
|
||||
push_to_hub: bool = False # Whether to push datasets to Hub
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
### Processor Pipeline Architecture
|
||||
|
||||
HIL-SERL uses a modular processor pipeline architecture that processes robot observations and actions through a series of composable steps. The pipeline is divided into two main components:
|
||||
|
||||
#### Environment Processor Pipeline
|
||||
|
||||
The environment processor (`env_processor`) handles incoming observations and environment state:
|
||||
|
||||
1. **VanillaObservationProcessor**: Converts raw robot observations into standardized format
|
||||
2. **JointVelocityProcessor** (optional): Adds joint velocity information to observations
|
||||
3. **MotorCurrentProcessor** (optional): Adds motor current readings to observations
|
||||
4. **ForwardKinematicsJointsToEE** (optional): Computes end-effector pose from joint positions
|
||||
5. **ImageCropResizeProcessor** (optional): Crops and resizes camera images
|
||||
6. **TimeLimitProcessor** (optional): Enforces episode time limits
|
||||
7. **GripperPenaltyProcessor** (optional): Applies penalties for inappropriate gripper usage
|
||||
8. **RewardClassifierProcessor** (optional): Automated reward detection using vision models
|
||||
9. **ToBatchProcessor**: Converts data to batch format for neural network processing
|
||||
10. **DeviceProcessor**: Moves data to the specified compute device (CPU/GPU)
|
||||
|
||||
#### Action Processor Pipeline
|
||||
|
||||
The action processor (`action_processor`) handles outgoing actions and human interventions:
|
||||
|
||||
1. **AddTeleopActionAsComplimentaryData**: Captures teleoperator actions for logging
|
||||
2. **AddTeleopEventsAsInfo**: Records intervention events and episode control signals
|
||||
3. **AddRobotObservationAsComplimentaryData**: Stores raw robot state for processing
|
||||
4. **InterventionActionProcessor**: Handles human interventions and episode termination
|
||||
5. **Inverse Kinematics Pipeline** (when enabled):
|
||||
- **MapDeltaActionToRobotAction**: Converts delta actions to robot action format
|
||||
- **EEReferenceAndDelta**: Computes end-effector reference and delta movements
|
||||
- **EEBoundsAndSafety**: Enforces workspace safety bounds
|
||||
- **InverseKinematicsEEToJoints**: Converts end-effector actions to joint targets
|
||||
- **GripperVelocityToJoint**: Handles gripper control commands
|
||||
|
||||
#### Configuration Examples
|
||||
|
||||
**Basic Observation Processing**:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"observation": {
|
||||
"add_joint_velocity_to_observation": true,
|
||||
"add_current_to_observation": false,
|
||||
"display_cameras": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Image Processing**:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"image_preprocessing": {
|
||||
"crop_params_dict": {
|
||||
"observation.images.front": [180, 250, 120, 150],
|
||||
"observation.images.side": [180, 207, 180, 200]
|
||||
},
|
||||
"resize_size": [128, 128]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Inverse Kinematics Setup**:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"inverse_kinematics": {
|
||||
"urdf_path": "path/to/robot.urdf",
|
||||
"target_frame_name": "end_effector",
|
||||
"end_effector_bounds": {
|
||||
"min": [0.16, -0.08, 0.03],
|
||||
"max": [0.24, 0.2, 0.1]
|
||||
},
|
||||
"end_effector_step_sizes": {
|
||||
"x": 0.02,
|
||||
"y": 0.02,
|
||||
"z": 0.02
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Advanced Observation Processing
|
||||
|
||||
The HIL-SERL framework supports additional observation processing features that can improve policy learning:
|
||||
|
||||
#### Joint Velocity Processing
|
||||
|
||||
Enable joint velocity estimation to provide the policy with motion information:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"observation": {
|
||||
"add_joint_velocity_to_observation": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This processor:
|
||||
|
||||
- Estimates joint velocities using finite differences between consecutive joint position readings
|
||||
- Adds velocity information to the observation state vector
|
||||
- Useful for policies that need motion awareness for dynamic tasks
|
||||
|
||||
#### Motor Current Processing
|
||||
|
||||
Monitor motor currents to detect contact forces and load conditions:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"observation": {
|
||||
"add_current_to_observation": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This processor:
|
||||
|
||||
- Reads motor current values from the robot's control system
|
||||
- Adds current measurements to the observation state vector
|
||||
- Helps detect contact events, object weights, and mechanical resistance
|
||||
- Useful for contact-rich manipulation tasks
|
||||
|
||||
#### Combined Observation Processing
|
||||
|
||||
You can enable multiple observation processing features simultaneously:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"observation": {
|
||||
"add_joint_velocity_to_observation": true,
|
||||
"add_current_to_observation": true,
|
||||
"add_ee_pose_to_observation": false,
|
||||
"display_cameras": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Note**: Enabling additional observation features increases the state space dimensionality, which may require adjusting your policy network architecture and potentially collecting more training data.
|
||||
|
||||
### Finding Robot Workspace Bounds
|
||||
|
||||
Before collecting demonstrations, you need to determine the appropriate operational bounds for your robot.
|
||||
@@ -101,6 +320,8 @@ python -m lerobot.scripts.find_joint_limits \
|
||||
--teleop.id=blue
|
||||
```
|
||||
|
||||
Note: You can also use `so101_leader` as the teleop type if you have the SO101 leader arm with reduced gears for smoother teleoperation.
|
||||
|
||||
**Workflow**
|
||||
|
||||
1. Run the script and move the robot through the space that solves the task
|
||||
@@ -130,22 +351,56 @@ With the bounds defined, you can safely collect demonstrations for training. Tra
|
||||
|
||||
Create a configuration file for recording demonstrations (or edit an existing one like [env_config_so100.json](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_so100.json)):
|
||||
|
||||
1. Set `mode` to `"record"`
|
||||
2. Specify a unique `repo_id` for your dataset (e.g., "username/task_name")
|
||||
3. Set `num_episodes` to the number of demonstrations you want to collect
|
||||
4. Set `crop_params_dict` to `null` initially (we'll determine crops later)
|
||||
5. Configure `robot`, `cameras`, and other hardware settings
|
||||
1. Set `mode` to `"record"` at the root level
|
||||
2. Specify a unique `repo_id` for your dataset in the `dataset` section (e.g., "username/task_name")
|
||||
3. Set `num_episodes_to_record` in the `dataset` section to the number of demonstrations you want to collect
|
||||
4. Set `env.processor.image_preprocessing.crop_params_dict` to `{}` initially (we'll determine crops later)
|
||||
5. Configure `env.robot`, `env.teleop`, and other hardware settings in the `env` section
|
||||
|
||||
Example configuration section:
|
||||
|
||||
```json
|
||||
"mode": "record",
|
||||
"repo_id": "username/pick_lift_cube",
|
||||
"dataset_root": null,
|
||||
"task": "pick_and_lift",
|
||||
"num_episodes": 15,
|
||||
"episode": 0,
|
||||
"push_to_hub": true
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "real_robot",
|
||||
"fps": 10,
|
||||
"processor": {
|
||||
"control_mode": "gamepad",
|
||||
"observation": {
|
||||
"display_cameras": false
|
||||
},
|
||||
"image_preprocessing": {
|
||||
"crop_params_dict": {},
|
||||
"resize_size": [128, 128]
|
||||
},
|
||||
"gripper": {
|
||||
"use_gripper": true,
|
||||
"gripper_penalty": 0.0
|
||||
},
|
||||
"reset": {
|
||||
"reset_time_s": 5.0,
|
||||
"control_time_s": 20.0
|
||||
}
|
||||
},
|
||||
"robot": {
|
||||
// ... robot configuration ...
|
||||
},
|
||||
"teleop": {
|
||||
// ... teleoperator configuration ...
|
||||
}
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "username/pick_lift_cube",
|
||||
"root": null,
|
||||
"task": "pick_and_lift",
|
||||
"num_episodes_to_record": 15,
|
||||
"replay_episode": 0,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record",
|
||||
"device": "cpu"
|
||||
}
|
||||
```
|
||||
|
||||
### Using a Teleoperation Device
|
||||
@@ -191,10 +446,20 @@ The gamepad provides a very convenient way to control the robot and the episode
|
||||
To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and define the `teleop` section in the configuration file.
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"teleop": {
|
||||
"type": "gamepad",
|
||||
"use_gripper": true
|
||||
"type": "gamepad",
|
||||
"use_gripper": true
|
||||
},
|
||||
"processor": {
|
||||
"control_mode": "gamepad",
|
||||
"gripper": {
|
||||
"use_gripper": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
<p align="center">
|
||||
@@ -213,16 +478,32 @@ To setup the gamepad, you need to set the `control_mode` to `"gamepad"` and defi
|
||||
|
||||
The SO101 leader arm has reduced gears that allows it to move and track the follower arm during exploration. Therefore, taking over is much smoother than the gearless SO100.
|
||||
|
||||
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file.
|
||||
To setup the SO101 leader, you need to set the `control_mode` to `"leader"` and define the `teleop` section in the configuration file with `leader_follower_mode` enabled:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"teleop": {
|
||||
"type": "so101_leader",
|
||||
"port": "/dev/tty.usbmodem585A0077921", # check your port number
|
||||
"use_degrees": true
|
||||
"type": "so101_leader",
|
||||
"port": "/dev/tty.usbmodem585A0077921",
|
||||
"use_degrees": true,
|
||||
"leader_follower_mode": true
|
||||
},
|
||||
"processor": {
|
||||
"control_mode": "leader",
|
||||
"gripper": {
|
||||
"use_gripper": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `leader_follower_mode` enables the leader arm to automatically track the follower's position when you're not intervening. This creates a seamless teleoperation experience where:
|
||||
|
||||
- When not intervening: the leader arm follows the follower arm's position
|
||||
- When intervening (press `space`): you control the leader arm, and the follower tracks it in end-effector space
|
||||
|
||||
In order to annotate the success/failure of the episode, **you will need** to use a keyboard to press `s` for success, `esc` for failure.
|
||||
During the online training, press `space` to take over the policy and `space` again to give the control back to the policy.
|
||||
|
||||
@@ -251,7 +532,7 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/e
|
||||
|
||||
During recording:
|
||||
|
||||
1. The robot will reset to the initial position defined in the configuration file `fixed_reset_joint_positions`
|
||||
1. The robot will reset to the initial position defined in the configuration file `env.processor.reset.fixed_reset_joint_positions`
|
||||
2. Complete the task successfully
|
||||
3. The episode ends with a reward of 1 when you press the "success" button
|
||||
4. If the time limit is reached, or the fail button is pressed, the episode ends with a reward of 0
|
||||
@@ -310,11 +591,19 @@ observation.images.front: [180, 250, 120, 150]
|
||||
Add these crop parameters to your training configuration:
|
||||
|
||||
```json
|
||||
"crop_params_dict": {
|
||||
"observation.images.side": [180, 207, 180, 200],
|
||||
"observation.images.front": [180, 250, 120, 150]
|
||||
},
|
||||
"resize_size": [128, 128]
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"image_preprocessing": {
|
||||
"crop_params_dict": {
|
||||
"observation.images.side": [180, 207, 180, 200],
|
||||
"observation.images.front": [180, 250, 120, 150]
|
||||
},
|
||||
"resize_size": [128, 128]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Recommended image resolution**
|
||||
@@ -343,26 +632,52 @@ python -m lerobot.scripts.rl.gym_manipulator --config_path src/lerobot/configs/r
|
||||
|
||||
**Key Parameters for Data Collection**
|
||||
|
||||
- **mode**: set it to `"record"` to collect a dataset
|
||||
- **repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
|
||||
- **num_episodes**: Number of episodes to record
|
||||
- **number_of_steps_after_success**: Number of additional frames to record after a success (reward=1) is detected
|
||||
- **fps**: Number of frames per second to record
|
||||
- **push_to_hub**: Whether to push the dataset to the hub
|
||||
- **mode**: set it to `"record"` to collect a dataset (at root level)
|
||||
- **dataset.repo_id**: `"hf_username/dataset_name"`, name of the dataset and repo on the hub
|
||||
- **dataset.num_episodes_to_record**: Number of episodes to record
|
||||
- **env.processor.reset.terminate_on_success**: Whether to automatically terminate episodes when success is detected (default: `true`)
|
||||
- **env.fps**: Number of frames per second to record
|
||||
- **dataset.push_to_hub**: Whether to push the dataset to the hub
|
||||
|
||||
The `number_of_steps_after_success` parameter is crucial as it allows you to collect more positive examples. When a success is detected, the system will continue recording for the specified number of steps while maintaining the reward=1 label. Otherwise, there won't be enough states in the dataset labeled to 1 to train a good classifier.
|
||||
The `env.processor.reset.terminate_on_success` parameter allows you to control episode termination behavior. When set to `false`, episodes will continue even after success is detected, allowing you to collect more positive examples with the reward=1 label. This is crucial for training reward classifiers as it provides more success state examples in your dataset. When set to `true` (default), episodes terminate immediately upon success detection.
|
||||
|
||||
**Important**: For reward classifier training, set `terminate_on_success: false` to collect sufficient positive examples. For regular HIL-SERL training, keep it as `true` to enable automatic episode termination when the task is completed successfully.
|
||||
|
||||
Example configuration section for data collection:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "real_robot",
|
||||
"fps": 10,
|
||||
"processor": {
|
||||
"reset": {
|
||||
"reset_time_s": 5.0,
|
||||
"control_time_s": 20.0,
|
||||
"terminate_on_success": false
|
||||
},
|
||||
"gripper": {
|
||||
"use_gripper": true
|
||||
}
|
||||
},
|
||||
"robot": {
|
||||
// ... robot configuration ...
|
||||
},
|
||||
"teleop": {
|
||||
// ... teleoperator configuration ...
|
||||
}
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "hf_username/dataset_name",
|
||||
"dataset_root": "data/your_dataset",
|
||||
"task": "reward_classifier_task",
|
||||
"num_episodes_to_record": 20,
|
||||
"replay_episode": null,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record",
|
||||
"repo_id": "hf_username/dataset_name",
|
||||
"dataset_root": "data/your_dataset",
|
||||
"num_episodes": 20,
|
||||
"push_to_hub": true,
|
||||
"fps": 10,
|
||||
"number_of_steps_after_success": 15
|
||||
"device": "cpu"
|
||||
}
|
||||
```
|
||||
|
||||
@@ -421,9 +736,17 @@ To use your trained reward classifier, configure the `HILSerlRobotEnvConfig` to
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
env_config = HILSerlRobotEnvConfig(
|
||||
reward_classifier_pretrained_path="path_to_your_pretrained_trained_model",
|
||||
# Other environment parameters
|
||||
config = GymManipulatorConfig(
|
||||
env=HILSerlRobotEnvConfig(
|
||||
processor=HILSerlProcessorConfig(
|
||||
reward_classifier=RewardClassifierConfig(
|
||||
pretrained_path="path_to_your_pretrained_trained_model"
|
||||
)
|
||||
),
|
||||
# Other environment parameters
|
||||
),
|
||||
dataset=DatasetConfig(...),
|
||||
mode=None # For training
|
||||
)
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
@@ -432,7 +755,18 @@ or set the argument in the json config file.
|
||||
|
||||
```json
|
||||
{
|
||||
"reward_classifier_pretrained_path": "path_to_your_pretrained_model"
|
||||
"env": {
|
||||
"processor": {
|
||||
"reward_classifier": {
|
||||
"pretrained_path": "path_to_your_pretrained_model",
|
||||
"success_threshold": 0.7,
|
||||
"success_reward": 1.0
|
||||
},
|
||||
"reset": {
|
||||
"terminate_on_success": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -32,9 +32,12 @@ To use `gym_hil` with LeRobot, you need to create a configuration file. An examp
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "hil",
|
||||
"name": "franka_sim",
|
||||
"task": "PandaPickCubeGamepad-v0",
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "gym_hil",
|
||||
"task": "PandaPickCubeGamepad-v0",
|
||||
"fps": 10
|
||||
},
|
||||
"device": "cuda"
|
||||
}
|
||||
```
|
||||
@@ -45,28 +48,40 @@ Available tasks:
|
||||
- `PandaPickCubeGamepad-v0`: With gamepad control
|
||||
- `PandaPickCubeKeyboard-v0`: With keyboard control
|
||||
|
||||
### Gym Wrappers Configuration
|
||||
### Processor Configuration
|
||||
|
||||
```json
|
||||
"wrapper": {
|
||||
"gripper_penalty": -0.02,
|
||||
"control_time_s": 15.0,
|
||||
"use_gripper": true,
|
||||
"fixed_reset_joint_positions": [0.0, 0.195, 0.0, -2.43, 0.0, 2.62, 0.785],
|
||||
"end_effector_step_sizes": {
|
||||
"x": 0.025,
|
||||
"y": 0.025,
|
||||
"z": 0.025
|
||||
},
|
||||
"control_mode": "gamepad"
|
||||
{
|
||||
"env": {
|
||||
"processor": {
|
||||
"control_mode": "gamepad",
|
||||
"gripper": {
|
||||
"use_gripper": true,
|
||||
"gripper_penalty": -0.02
|
||||
},
|
||||
"reset": {
|
||||
"control_time_s": 15.0,
|
||||
"fixed_reset_joint_positions": [
|
||||
0.0, 0.195, 0.0, -2.43, 0.0, 2.62, 0.785
|
||||
]
|
||||
},
|
||||
"inverse_kinematics": {
|
||||
"end_effector_step_sizes": {
|
||||
"x": 0.025,
|
||||
"y": 0.025,
|
||||
"z": 0.025
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Important parameters:
|
||||
|
||||
- `gripper_penalty`: Penalty for excessive gripper movement
|
||||
- `use_gripper`: Whether to enable gripper control
|
||||
- `end_effector_step_sizes`: Size of the steps in the x,y,z axes of the end-effector
|
||||
- `gripper.gripper_penalty`: Penalty for excessive gripper movement
|
||||
- `gripper.use_gripper`: Whether to enable gripper control
|
||||
- `inverse_kinematics.end_effector_step_sizes`: Size of the steps in the x,y,z axes of the end-effector
|
||||
- `control_mode`: Set to `"gamepad"` to use a gamepad controller
|
||||
|
||||
## Running with HIL RL of LeRobot
|
||||
@@ -75,39 +90,50 @@ Important parameters:
|
||||
|
||||
To run the environment, set mode to null:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
```bash
|
||||
python -m lerobot.scripts.rl.gym_manipulator --config_path path/to/gym_hil_env.json
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
### Recording a Dataset
|
||||
|
||||
To collect a dataset, set the mode to `record` whilst defining the repo_id and number of episodes to record:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "gym_hil",
|
||||
"task": "PandaPickCubeGamepad-v0"
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "username/sim_dataset",
|
||||
"root": null,
|
||||
"task": "pick_cube",
|
||||
"num_episodes_to_record": 10,
|
||||
"replay_episode": null,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record"
|
||||
}
|
||||
```
|
||||
|
||||
```bash
|
||||
python -m lerobot.scripts.rl.gym_manipulator --config_path path/to/gym_hil_env.json
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
### Training a Policy
|
||||
|
||||
To train a policy, checkout the configuration example available [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/train_gym_hil_env.json) and run the actor and learner servers:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
```bash
|
||||
python -m lerobot.scripts.rl.actor --config_path path/to/train_gym_hil_env.json
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
In a different terminal, run the learner server:
|
||||
|
||||
<!-- prettier-ignore-start -->
|
||||
```python
|
||||
```bash
|
||||
python -m lerobot.scripts.rl.learner --config_path path/to/train_gym_hil_env.json
|
||||
```
|
||||
<!-- prettier-ignore-end -->
|
||||
|
||||
The simulation environment provides a safe and repeatable way to develop and test your Human-In-the-Loop reinforcement learning components before deploying to real robots.
|
||||
|
||||
|
||||
@@ -24,11 +24,36 @@ pip install -e ".[hilserl]"
|
||||
|
||||
To use `gym_hil` with LeRobot, you need to use a configuration file. An example config file can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/env_config_gym_hil_il.json).
|
||||
|
||||
To teleoperate and collect a dataset, we need to modify this config file and you should add your `repo_id` here: `"repo_id": "il_gym",` and `"num_episodes": 30,` and make sure you set `mode` to `record`, "mode": "record".
|
||||
To teleoperate and collect a dataset, we need to modify this config file. Here's an example configuration for imitation learning data collection:
|
||||
|
||||
If you do not have a Nvidia GPU also change `"device": "cuda"` parameter in the config file (for example to `mps` for MacOS).
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "gym_hil",
|
||||
"task": "PandaPickCubeGamepad-v0",
|
||||
"fps": 10
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "your_username/il_gym",
|
||||
"root": null,
|
||||
"task": "pick_cube",
|
||||
"num_episodes_to_record": 30,
|
||||
"replay_episode": null,
|
||||
"push_to_hub": true
|
||||
},
|
||||
"mode": "record",
|
||||
"device": "cuda"
|
||||
}
|
||||
```
|
||||
|
||||
By default the config file assumes you use a controller. To use your keyboard please change the envoirment specified at `"task"` in the config file and set it to `"PandaPickCubeKeyboard-v0"`.
|
||||
Key configuration points:
|
||||
|
||||
- Set your `repo_id` in the `dataset` section: `"repo_id": "your_username/il_gym"`
|
||||
- Set `num_episodes_to_record: 30` to collect 30 demonstration episodes
|
||||
- Ensure `mode` is set to `"record"`
|
||||
- If you don't have an NVIDIA GPU, change `"device": "cuda"` to `"mps"` for macOS or `"cpu"`
|
||||
- To use keyboard instead of gamepad, change `"task"` to `"PandaPickCubeKeyboard-v0"`
|
||||
|
||||
Then we can run this command to start:
|
||||
|
||||
@@ -140,9 +165,32 @@ huggingface-cli upload ${HF_USER}/il_sim_test${CKPT} \
|
||||
|
||||
## Evaluate your policy in Sim
|
||||
|
||||
To evaluate your policy we have to use the config file that can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/eval_config_gym_hil.json).
|
||||
To evaluate your policy we have to use a configuration file. An example can be found [here](https://huggingface.co/datasets/aractingi/lerobot-example-config-files/blob/main/eval_config_gym_hil.json).
|
||||
|
||||
Make sure to replace the `repo_id` with the dataset you trained on, for example `pepijn223/il_sim_dataset` and replace the `pretrained_policy_name_or_path` with your model id, for example `pepijn223/il_sim_model`
|
||||
Here's an example evaluation configuration:
|
||||
|
||||
```json
|
||||
{
|
||||
"env": {
|
||||
"type": "gym_manipulator",
|
||||
"name": "gym_hil",
|
||||
"task": "PandaPickCubeGamepad-v0",
|
||||
"fps": 10
|
||||
},
|
||||
"dataset": {
|
||||
"repo_id": "your_username/il_sim_dataset",
|
||||
"dataset_root": null,
|
||||
"task": "pick_cube"
|
||||
},
|
||||
"pretrained_policy_name_or_path": "your_username/il_sim_model",
|
||||
"device": "cuda"
|
||||
}
|
||||
```
|
||||
|
||||
Make sure to replace:
|
||||
|
||||
- `repo_id` with the dataset you trained on (e.g., `your_username/il_sim_dataset`)
|
||||
- `pretrained_policy_name_or_path` with your model ID (e.g., `your_username/il_sim_model`)
|
||||
|
||||
Then you can run this command to visualize your trained policy
|
||||
|
||||
|
||||
@@ -161,35 +161,73 @@ class XarmEnv(EnvConfig):
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoRecordConfig:
|
||||
"""Configuration for video recording in ManiSkill environments."""
|
||||
|
||||
enabled: bool = False
|
||||
record_dir: str = "videos"
|
||||
trajectory_name: str = "trajectory"
|
||||
class ImagePreprocessingConfig:
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
|
||||
resize_size: tuple[int, int] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnvTransformConfig:
|
||||
"""Configuration for environment wrappers."""
|
||||
class RewardClassifierConfig:
|
||||
"""Configuration for reward classification."""
|
||||
|
||||
pretrained_path: str | None = None
|
||||
success_threshold: float = 0.5
|
||||
success_reward: float = 1.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class InverseKinematicsConfig:
|
||||
"""Configuration for inverse kinematics processing."""
|
||||
|
||||
urdf_path: str | None = None
|
||||
target_frame_name: str | None = None
|
||||
end_effector_bounds: dict[str, list[float]] | None = None
|
||||
end_effector_step_sizes: dict[str, float] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ObservationConfig:
|
||||
"""Configuration for observation processing."""
|
||||
|
||||
# ee_action_space_params: EEActionSpaceConfig = field(default_factory=EEActionSpaceConfig)
|
||||
control_mode: str = "gamepad"
|
||||
display_cameras: bool = False
|
||||
add_joint_velocity_to_observation: bool = False
|
||||
add_current_to_observation: bool = False
|
||||
add_ee_pose_to_observation: bool = False
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
|
||||
resize_size: tuple[int, int] | None = None
|
||||
control_time_s: float = 20.0
|
||||
fixed_reset_joint_positions: Any | None = None
|
||||
reset_time_s: float = 5.0
|
||||
display_cameras: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class GripperConfig:
|
||||
"""Configuration for gripper control and penalties."""
|
||||
|
||||
use_gripper: bool = True
|
||||
gripper_quantization_threshold: float | None = 0.8
|
||||
gripper_penalty: float = 0.0
|
||||
gripper_penalty_in_reward: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResetConfig:
|
||||
"""Configuration for environment reset behavior."""
|
||||
|
||||
fixed_reset_joint_positions: Any | None = None
|
||||
reset_time_s: float = 5.0
|
||||
control_time_s: float = 20.0
|
||||
terminate_on_success: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class HILSerlProcessorConfig:
|
||||
"""Configuration for environment processing pipeline."""
|
||||
|
||||
control_mode: str = "gamepad"
|
||||
observation: ObservationConfig | None = None
|
||||
image_preprocessing: ImagePreprocessingConfig | None = None
|
||||
gripper: GripperConfig | None = None
|
||||
reset: ResetConfig | None = None
|
||||
inverse_kinematics: InverseKinematicsConfig | None = None
|
||||
reward_classifier: RewardClassifierConfig | None = None
|
||||
max_gripper_pos: float | None = 100.0
|
||||
|
||||
|
||||
@EnvConfig.register_subclass(name="gym_manipulator")
|
||||
@dataclass
|
||||
class HILSerlRobotEnvConfig(EnvConfig):
|
||||
@@ -197,77 +235,10 @@ class HILSerlRobotEnvConfig(EnvConfig):
|
||||
|
||||
robot: RobotConfig | None = None
|
||||
teleop: TeleoperatorConfig | None = None
|
||||
wrapper: EnvTransformConfig | None = None
|
||||
fps: int = 10
|
||||
processor: HILSerlProcessorConfig = field(default_factory=HILSerlProcessorConfig)
|
||||
|
||||
name: str = "real_robot"
|
||||
mode: str | None = None # Either "record", "replay", None
|
||||
repo_id: str | None = None
|
||||
dataset_root: str | None = None
|
||||
task: str | None = ""
|
||||
num_episodes: int = 10 # only for record mode
|
||||
episode: int = 0
|
||||
device: str = "cuda"
|
||||
push_to_hub: bool = True
|
||||
pretrained_policy_name_or_path: str | None = None
|
||||
reward_classifier_pretrained_path: str | None = None
|
||||
# For the reward classifier, to record more positive examples after a success
|
||||
number_of_steps_after_success: int = 0
|
||||
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {}
|
||||
|
||||
|
||||
@EnvConfig.register_subclass("hil")
|
||||
@dataclass
|
||||
class HILEnvConfig(EnvConfig):
|
||||
"""Configuration for the HIL environment."""
|
||||
|
||||
name: str = "PandaPickCube"
|
||||
task: str | None = "PandaPickCubeKeyboard-v0"
|
||||
use_viewer: bool = True
|
||||
gripper_penalty: float = 0.0
|
||||
use_gamepad: bool = True
|
||||
state_dim: int = 18
|
||||
action_dim: int = 4
|
||||
fps: int = 100
|
||||
episode_length: int = 100
|
||||
video_record: VideoRecordConfig = field(default_factory=VideoRecordConfig)
|
||||
features: dict[str, PolicyFeature] = field(
|
||||
default_factory=lambda: {
|
||||
"action": PolicyFeature(type=FeatureType.ACTION, shape=(4,)),
|
||||
"observation.image": PolicyFeature(type=FeatureType.VISUAL, shape=(3, 128, 128)),
|
||||
"observation.state": PolicyFeature(type=FeatureType.STATE, shape=(18,)),
|
||||
}
|
||||
)
|
||||
features_map: dict[str, str] = field(
|
||||
default_factory=lambda: {
|
||||
"action": ACTION,
|
||||
"observation.image": OBS_IMAGE,
|
||||
"observation.state": OBS_STATE,
|
||||
}
|
||||
)
|
||||
################# args from hilserlrobotenv
|
||||
reward_classifier_pretrained_path: str | None = None
|
||||
robot_config: RobotConfig | None = None
|
||||
teleop_config: TeleoperatorConfig | None = None
|
||||
wrapper: EnvTransformConfig | None = None
|
||||
mode: str | None = None # Either "record", "replay", None
|
||||
repo_id: str | None = None
|
||||
dataset_root: str | None = None
|
||||
num_episodes: int = 10 # only for record mode
|
||||
episode: int = 0
|
||||
device: str = "cuda"
|
||||
push_to_hub: bool = True
|
||||
pretrained_policy_name_or_path: str | None = None
|
||||
# For the reward classifier, to record more positive examples after a success
|
||||
number_of_steps_after_success: int = 0
|
||||
############################
|
||||
|
||||
@property
|
||||
def gym_kwargs(self) -> dict:
|
||||
return {
|
||||
"use_viewer": self.use_viewer,
|
||||
"use_gamepad": self.use_gamepad,
|
||||
"gripper_penalty": self.gripper_penalty,
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import importlib
|
||||
|
||||
import gymnasium as gym
|
||||
|
||||
from lerobot.envs.configs import AlohaEnv, EnvConfig, HILEnvConfig, PushtEnv, XarmEnv
|
||||
from lerobot.envs.configs import AlohaEnv, EnvConfig, PushtEnv, XarmEnv
|
||||
|
||||
|
||||
def make_env_config(env_type: str, **kwargs) -> EnvConfig:
|
||||
@@ -27,8 +27,6 @@ def make_env_config(env_type: str, **kwargs) -> EnvConfig:
|
||||
return PushtEnv(**kwargs)
|
||||
elif env_type == "xarm":
|
||||
return XarmEnv(**kwargs)
|
||||
elif env_type == "hil":
|
||||
return HILEnvConfig(**kwargs)
|
||||
else:
|
||||
raise ValueError(f"Policy type '{env_type}' is not available.")
|
||||
|
||||
|
||||
@@ -140,8 +140,6 @@ def make_processor(
|
||||
NotImplementedError: If the policy type doesn't have a processor implemented.
|
||||
"""
|
||||
if pretrained_path:
|
||||
# Load a pretrained processor
|
||||
# TODO(azouitine): Handle this case.
|
||||
return (
|
||||
RobotProcessor.from_pretrained(
|
||||
pretrained_model_name_or_path=pretrained_path,
|
||||
|
||||
@@ -1,420 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import numpy as np
|
||||
import torch
|
||||
from torch import Tensor, nn
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
|
||||
|
||||
def create_stats_buffers(
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
) -> dict[str, dict[str, nn.ParameterDict]]:
|
||||
"""
|
||||
Create buffers per modality (e.g. "observation.image", "action") containing their mean, std, min, max
|
||||
statistics.
|
||||
|
||||
Args: (see Normalize and Unnormalize)
|
||||
|
||||
Returns:
|
||||
dict: A dictionary where keys are modalities and values are `nn.ParameterDict` containing
|
||||
`nn.Parameters` set to `requires_grad=False`, suitable to not be updated during backpropagation.
|
||||
"""
|
||||
stats_buffers = {}
|
||||
|
||||
for key, ft in features.items():
|
||||
norm_mode = norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
assert isinstance(norm_mode, NormalizationMode)
|
||||
|
||||
shape = tuple(ft.shape)
|
||||
|
||||
if ft.type is FeatureType.VISUAL:
|
||||
# sanity checks
|
||||
assert len(shape) == 3, f"number of dimensions of {key} != 3 ({shape=}"
|
||||
c, h, w = shape
|
||||
assert c < h and c < w, f"{key} is not channel first ({shape=})"
|
||||
# override image shape to be invariant to height and width
|
||||
shape = (c, 1, 1)
|
||||
|
||||
# Note: we initialize mean, std, min, max to infinity. They should be overwritten
|
||||
# downstream by `stats` or `policy.load_state_dict`, as expected. During forward,
|
||||
# we assert they are not infinity anymore.
|
||||
|
||||
buffer = {}
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = torch.ones(shape, dtype=torch.float32) * torch.inf
|
||||
std = torch.ones(shape, dtype=torch.float32) * torch.inf
|
||||
buffer = nn.ParameterDict(
|
||||
{
|
||||
"mean": nn.Parameter(mean, requires_grad=False),
|
||||
"std": nn.Parameter(std, requires_grad=False),
|
||||
}
|
||||
)
|
||||
elif norm_mode is NormalizationMode.MIN_MAX:
|
||||
min = torch.ones(shape, dtype=torch.float32) * torch.inf
|
||||
max = torch.ones(shape, dtype=torch.float32) * torch.inf
|
||||
buffer = nn.ParameterDict(
|
||||
{
|
||||
"min": nn.Parameter(min, requires_grad=False),
|
||||
"max": nn.Parameter(max, requires_grad=False),
|
||||
}
|
||||
)
|
||||
|
||||
# TODO(aliberts, rcadene): harmonize this to only use one framework (np or torch)
|
||||
if stats:
|
||||
if isinstance(stats[key]["mean"], np.ndarray):
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
buffer["mean"].data = torch.from_numpy(stats[key]["mean"]).to(dtype=torch.float32)
|
||||
buffer["std"].data = torch.from_numpy(stats[key]["std"]).to(dtype=torch.float32)
|
||||
elif norm_mode is NormalizationMode.MIN_MAX:
|
||||
buffer["min"].data = torch.from_numpy(stats[key]["min"]).to(dtype=torch.float32)
|
||||
buffer["max"].data = torch.from_numpy(stats[key]["max"]).to(dtype=torch.float32)
|
||||
elif isinstance(stats[key]["mean"], torch.Tensor):
|
||||
# Note: The clone is needed to make sure that the logic in save_pretrained doesn't see duplicated
|
||||
# tensors anywhere (for example, when we use the same stats for normalization and
|
||||
# unnormalization). See the logic here
|
||||
# https://github.com/huggingface/safetensors/blob/079781fd0dc455ba0fe851e2b4507c33d0c0d407/bindings/python/py_src/safetensors/torch.py#L97.
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
buffer["mean"].data = stats[key]["mean"].clone().to(dtype=torch.float32)
|
||||
buffer["std"].data = stats[key]["std"].clone().to(dtype=torch.float32)
|
||||
elif norm_mode is NormalizationMode.MIN_MAX:
|
||||
buffer["min"].data = stats[key]["min"].clone().to(dtype=torch.float32)
|
||||
buffer["max"].data = stats[key]["max"].clone().to(dtype=torch.float32)
|
||||
else:
|
||||
type_ = type(stats[key]["mean"])
|
||||
raise ValueError(f"np.ndarray or torch.Tensor expected, but type is '{type_}' instead.")
|
||||
|
||||
stats_buffers[key] = buffer
|
||||
return stats_buffers
|
||||
|
||||
|
||||
def _no_stats_error_str(name: str) -> str:
|
||||
return (
|
||||
f"`{name}` is infinity. You should either initialize with `stats` as an argument, or use a "
|
||||
"pretrained model."
|
||||
)
|
||||
|
||||
|
||||
class Normalize(nn.Module):
|
||||
"""Normalizes data (e.g. "observation.image") for more stable and faster convergence during training."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
shapes (dict): A dictionary where keys are input modalities (e.g. "observation.image") and values
|
||||
are their shapes (e.g. `[3,96,96]`]). These shapes are used to create the tensor buffer containing
|
||||
mean, std, min, max statistics. If the provided `shapes` contain keys related to images, the shape
|
||||
is adjusted to be invariant to height and width, assuming a channel-first (c, h, w) format.
|
||||
modes (dict): A dictionary where keys are output modalities (e.g. "observation.image") and values
|
||||
are their normalization modes among:
|
||||
- "mean_std": subtract the mean and divide by standard deviation.
|
||||
- "min_max": map to [-1, 1] range.
|
||||
stats (dict, optional): A dictionary where keys are output modalities (e.g. "observation.image")
|
||||
and values are dictionaries of statistic types and their values (e.g.
|
||||
`{"mean": torch.randn(3,1,1)}, "std": torch.randn(3,1,1)}`). If provided, as expected for
|
||||
training the model for the first time, these statistics will overwrite the default buffers. If
|
||||
not provided, as expected for finetuning or evaluation, the default buffers should to be
|
||||
overwritten by a call to `policy.load_state_dict(state_dict)`. That way, initializing the
|
||||
dataset is not needed to get the stats, since they are already in the policy state_dict.
|
||||
"""
|
||||
super().__init__()
|
||||
self.features = features
|
||||
self.norm_map = norm_map
|
||||
self.stats = stats
|
||||
stats_buffers = create_stats_buffers(features, norm_map, stats)
|
||||
for key, buffer in stats_buffers.items():
|
||||
setattr(self, "buffer_" + key.replace(".", "_"), buffer)
|
||||
|
||||
# TODO(rcadene): should we remove torch.no_grad?
|
||||
@torch.no_grad()
|
||||
def forward(self, batch: dict[str, Tensor]) -> dict[str, Tensor]:
|
||||
# TODO: Remove this shallow copy
|
||||
batch = dict(batch) # shallow copy avoids mutating the input batch
|
||||
for key, ft in self.features.items():
|
||||
if key not in batch:
|
||||
# FIXME(aliberts, rcadene): This might lead to silent fail!
|
||||
continue
|
||||
|
||||
norm_mode = self.norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
buffer = getattr(self, "buffer_" + key.replace(".", "_"))
|
||||
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = buffer["mean"]
|
||||
std = buffer["std"]
|
||||
assert not torch.isinf(mean).any(), _no_stats_error_str("mean")
|
||||
assert not torch.isinf(std).any(), _no_stats_error_str("std")
|
||||
batch[key] = (batch[key] - mean) / (std + 1e-8)
|
||||
elif norm_mode is NormalizationMode.MIN_MAX:
|
||||
min = buffer["min"]
|
||||
max = buffer["max"]
|
||||
assert not torch.isinf(min).any(), _no_stats_error_str("min")
|
||||
assert not torch.isinf(max).any(), _no_stats_error_str("max")
|
||||
# normalize to [0,1]
|
||||
batch[key] = (batch[key] - min) / (max - min + 1e-8)
|
||||
# normalize to [-1, 1]
|
||||
batch[key] = batch[key] * 2 - 1
|
||||
else:
|
||||
raise ValueError(norm_mode)
|
||||
return batch
|
||||
|
||||
|
||||
class Unnormalize(nn.Module):
|
||||
"""
|
||||
Similar to `Normalize` but unnormalizes output data (e.g. `{"action": torch.randn(b,c)}`) in their
|
||||
original range used by the environment.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
shapes (dict): A dictionary where keys are input modalities (e.g. "observation.image") and values
|
||||
are their shapes (e.g. `[3,96,96]`]). These shapes are used to create the tensor buffer containing
|
||||
mean, std, min, max statistics. If the provided `shapes` contain keys related to images, the shape
|
||||
is adjusted to be invariant to height and width, assuming a channel-first (c, h, w) format.
|
||||
modes (dict): A dictionary where keys are output modalities (e.g. "observation.image") and values
|
||||
are their normalization modes among:
|
||||
- "mean_std": subtract the mean and divide by standard deviation.
|
||||
- "min_max": map to [-1, 1] range.
|
||||
stats (dict, optional): A dictionary where keys are output modalities (e.g. "observation.image")
|
||||
and values are dictionaries of statistic types and their values (e.g.
|
||||
`{"mean": torch.randn(3,1,1)}, "std": torch.randn(3,1,1)}`). If provided, as expected for
|
||||
training the model for the first time, these statistics will overwrite the default buffers. If
|
||||
not provided, as expected for finetuning or evaluation, the default buffers should to be
|
||||
overwritten by a call to `policy.load_state_dict(state_dict)`. That way, initializing the
|
||||
dataset is not needed to get the stats, since they are already in the policy state_dict.
|
||||
"""
|
||||
super().__init__()
|
||||
self.features = features
|
||||
self.norm_map = norm_map
|
||||
self.stats = stats
|
||||
# `self.buffer_observation_state["mean"]` contains `torch.tensor(state_dim)`
|
||||
stats_buffers = create_stats_buffers(features, norm_map, stats)
|
||||
for key, buffer in stats_buffers.items():
|
||||
setattr(self, "buffer_" + key.replace(".", "_"), buffer)
|
||||
|
||||
# TODO(rcadene): should we remove torch.no_grad?
|
||||
@torch.no_grad()
|
||||
def forward(self, batch: dict[str, Tensor]) -> dict[str, Tensor]:
|
||||
batch = dict(batch) # shallow copy avoids mutating the input batch
|
||||
for key, ft in self.features.items():
|
||||
if key not in batch:
|
||||
continue
|
||||
|
||||
norm_mode = self.norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
buffer = getattr(self, "buffer_" + key.replace(".", "_"))
|
||||
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = buffer["mean"]
|
||||
std = buffer["std"]
|
||||
assert not torch.isinf(mean).any(), _no_stats_error_str("mean")
|
||||
assert not torch.isinf(std).any(), _no_stats_error_str("std")
|
||||
batch[key] = batch[key] * std + mean
|
||||
elif norm_mode is NormalizationMode.MIN_MAX:
|
||||
min = buffer["min"]
|
||||
max = buffer["max"]
|
||||
assert not torch.isinf(min).any(), _no_stats_error_str("min")
|
||||
assert not torch.isinf(max).any(), _no_stats_error_str("max")
|
||||
batch[key] = (batch[key] + 1) / 2
|
||||
batch[key] = batch[key] * (max - min) + min
|
||||
else:
|
||||
raise ValueError(norm_mode)
|
||||
return batch
|
||||
|
||||
|
||||
# TODO (azouitine): We should replace all normalization on the policies with register_buffer normalization
|
||||
# and remove the `Normalize` and `Unnormalize` classes.
|
||||
def _initialize_stats_buffers(
|
||||
module: nn.Module,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
) -> None:
|
||||
"""Register statistics buffers (mean/std or min/max) on the given *module*.
|
||||
|
||||
The logic matches the previous constructors of `NormalizeBuffer` and `UnnormalizeBuffer`,
|
||||
but is factored out so it can be reused by both classes and stay in sync.
|
||||
"""
|
||||
for key, ft in features.items():
|
||||
norm_mode = norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
shape: tuple[int, ...] = tuple(ft.shape)
|
||||
if ft.type is FeatureType.VISUAL:
|
||||
# reduce spatial dimensions, keep channel dimension only
|
||||
c, *_ = shape
|
||||
shape = (c, 1, 1)
|
||||
|
||||
prefix = key.replace(".", "_")
|
||||
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = torch.full(shape, torch.inf, dtype=torch.float32)
|
||||
std = torch.full(shape, torch.inf, dtype=torch.float32)
|
||||
|
||||
if stats and key in stats and "mean" in stats[key] and "std" in stats[key]:
|
||||
mean_data = stats[key]["mean"]
|
||||
std_data = stats[key]["std"]
|
||||
if isinstance(mean_data, torch.Tensor):
|
||||
# Note: The clone is needed to make sure that the logic in save_pretrained doesn't see duplicated
|
||||
# tensors anywhere (for example, when we use the same stats for normalization and
|
||||
# unnormalization). See the logic here
|
||||
# https://github.com/huggingface/safetensors/blob/079781fd0dc455ba0fe851e2b4507c33d0c0d407/bindings/python/py_src/safetensors/torch.py#L97.
|
||||
mean = mean_data.clone().to(dtype=torch.float32)
|
||||
std = std_data.clone().to(dtype=torch.float32)
|
||||
else:
|
||||
raise ValueError(f"Unsupported stats type for key '{key}' (expected ndarray or Tensor).")
|
||||
|
||||
module.register_buffer(f"{prefix}_mean", mean)
|
||||
module.register_buffer(f"{prefix}_std", std)
|
||||
continue
|
||||
|
||||
if norm_mode is NormalizationMode.MIN_MAX:
|
||||
min_val = torch.full(shape, torch.inf, dtype=torch.float32)
|
||||
max_val = torch.full(shape, torch.inf, dtype=torch.float32)
|
||||
|
||||
if stats and key in stats and "min" in stats[key] and "max" in stats[key]:
|
||||
min_data = stats[key]["min"]
|
||||
max_data = stats[key]["max"]
|
||||
if isinstance(min_data, torch.Tensor):
|
||||
min_val = min_data.clone().to(dtype=torch.float32)
|
||||
max_val = max_data.clone().to(dtype=torch.float32)
|
||||
else:
|
||||
raise ValueError(f"Unsupported stats type for key '{key}' (expected ndarray or Tensor).")
|
||||
|
||||
module.register_buffer(f"{prefix}_min", min_val)
|
||||
module.register_buffer(f"{prefix}_max", max_val)
|
||||
continue
|
||||
|
||||
raise ValueError(norm_mode)
|
||||
|
||||
|
||||
class NormalizeBuffer(nn.Module):
|
||||
"""Same as `Normalize` but statistics are stored as registered buffers rather than parameters."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.features = features
|
||||
self.norm_map = norm_map
|
||||
|
||||
_initialize_stats_buffers(self, features, norm_map, stats)
|
||||
|
||||
def forward(self, batch: dict[str, Tensor]) -> dict[str, Tensor]:
|
||||
batch = dict(batch)
|
||||
for key, ft in self.features.items():
|
||||
if key not in batch:
|
||||
continue
|
||||
|
||||
norm_mode = self.norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
prefix = key.replace(".", "_")
|
||||
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = getattr(self, f"{prefix}_mean")
|
||||
std = getattr(self, f"{prefix}_std")
|
||||
assert not torch.isinf(mean).any(), _no_stats_error_str("mean")
|
||||
assert not torch.isinf(std).any(), _no_stats_error_str("std")
|
||||
batch[key] = (batch[key] - mean) / (std + 1e-8)
|
||||
continue
|
||||
|
||||
if norm_mode is NormalizationMode.MIN_MAX:
|
||||
min_val = getattr(self, f"{prefix}_min")
|
||||
max_val = getattr(self, f"{prefix}_max")
|
||||
assert not torch.isinf(min_val).any(), _no_stats_error_str("min")
|
||||
assert not torch.isinf(max_val).any(), _no_stats_error_str("max")
|
||||
batch[key] = (batch[key] - min_val) / (max_val - min_val + 1e-8)
|
||||
batch[key] = batch[key] * 2 - 1
|
||||
continue
|
||||
|
||||
raise ValueError(norm_mode)
|
||||
|
||||
return batch
|
||||
|
||||
|
||||
class UnnormalizeBuffer(nn.Module):
|
||||
"""Inverse operation of `NormalizeBuffer`. Uses registered buffers for statistics."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
features: dict[str, PolicyFeature],
|
||||
norm_map: dict[str, NormalizationMode],
|
||||
stats: dict[str, dict[str, Tensor]] | None = None,
|
||||
):
|
||||
super().__init__()
|
||||
self.features = features
|
||||
self.norm_map = norm_map
|
||||
|
||||
_initialize_stats_buffers(self, features, norm_map, stats)
|
||||
|
||||
def forward(self, batch: dict[str, Tensor]) -> dict[str, Tensor]:
|
||||
# batch = dict(batch)
|
||||
for key, ft in self.features.items():
|
||||
if key not in batch:
|
||||
continue
|
||||
|
||||
norm_mode = self.norm_map.get(ft.type, NormalizationMode.IDENTITY)
|
||||
if norm_mode is NormalizationMode.IDENTITY:
|
||||
continue
|
||||
|
||||
prefix = key.replace(".", "_")
|
||||
|
||||
if norm_mode is NormalizationMode.MEAN_STD:
|
||||
mean = getattr(self, f"{prefix}_mean")
|
||||
std = getattr(self, f"{prefix}_std")
|
||||
assert not torch.isinf(mean).any(), _no_stats_error_str("mean")
|
||||
assert not torch.isinf(std).any(), _no_stats_error_str("std")
|
||||
batch[key] = batch[key] * std + mean
|
||||
continue
|
||||
|
||||
if norm_mode is NormalizationMode.MIN_MAX:
|
||||
min_val = getattr(self, f"{prefix}_min")
|
||||
max_val = getattr(self, f"{prefix}_max")
|
||||
assert not torch.isinf(min_val).any(), _no_stats_error_str("min")
|
||||
assert not torch.isinf(max_val).any(), _no_stats_error_str("max")
|
||||
batch[key] = (batch[key] + 1) / 2
|
||||
batch[key] = batch[key] * (max_val - min_val) + min_val
|
||||
continue
|
||||
|
||||
raise ValueError(norm_mode)
|
||||
|
||||
return batch
|
||||
@@ -15,7 +15,20 @@
|
||||
# limitations under the License.
|
||||
|
||||
from .batch_processor import ToBatchProcessor
|
||||
from .delta_action_processor import MapDeltaActionToRobotAction, MapTensorToDeltaActionDict
|
||||
from .device_processor import DeviceProcessor
|
||||
from .gym_action_processor import Numpy2TorchActionProcessor, Torch2NumpyActionProcessor
|
||||
from .hil_processor import (
|
||||
AddTeleopActionAsComplimentaryData,
|
||||
AddTeleopEventsAsInfo,
|
||||
GripperPenaltyProcessor,
|
||||
ImageCropResizeProcessor,
|
||||
InterventionActionProcessor,
|
||||
RewardClassifierProcessor,
|
||||
TimeLimitProcessor,
|
||||
)
|
||||
from .joint_observations_processor import JointVelocityProcessor, MotorCurrentProcessor
|
||||
from .leader_follower_processor import LeaderFollowerProcessor
|
||||
from .normalize_processor import NormalizerProcessor, UnnormalizerProcessor, hotswap_stats
|
||||
from .observation_processor import VanillaObservationProcessor
|
||||
from .pipeline import (
|
||||
@@ -37,11 +50,22 @@ from .tokenizer_processor import TokenizerProcessor
|
||||
|
||||
__all__ = [
|
||||
"ActionProcessor",
|
||||
"AddTeleopActionAsComplimentaryData",
|
||||
"AddTeleopEventsAsInfo",
|
||||
"DeviceProcessor",
|
||||
"DoneProcessor",
|
||||
"MapDeltaActionToRobotAction",
|
||||
"MapTensorToDeltaActionDict",
|
||||
"EnvTransition",
|
||||
"GripperPenaltyProcessor",
|
||||
"IdentityProcessor",
|
||||
"ImageCropResizeProcessor",
|
||||
"InfoProcessor",
|
||||
"InterventionActionProcessor",
|
||||
"JointVelocityProcessor",
|
||||
"LeaderFollowerProcessor",
|
||||
"MapDeltaActionToRobotAction",
|
||||
"MotorCurrentProcessor",
|
||||
"NormalizerProcessor",
|
||||
"UnnormalizerProcessor",
|
||||
"hotswap_stats",
|
||||
@@ -49,10 +73,14 @@ __all__ = [
|
||||
"ProcessorStep",
|
||||
"ProcessorStepRegistry",
|
||||
"RenameProcessor",
|
||||
"RewardClassifierProcessor",
|
||||
"RewardProcessor",
|
||||
"RobotProcessor",
|
||||
"ToBatchProcessor",
|
||||
"TokenizerProcessor",
|
||||
"TimeLimitProcessor",
|
||||
"Numpy2TorchActionProcessor",
|
||||
"Torch2NumpyActionProcessor",
|
||||
"TransitionKey",
|
||||
"TruncatedProcessor",
|
||||
"VanillaObservationProcessor",
|
||||
|
||||
137
src/lerobot/processor/delta_action_processor.py
Normal file
137
src/lerobot/processor/delta_action_processor.py
Normal file
@@ -0,0 +1,137 @@
|
||||
# !/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from torch import Tensor
|
||||
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("map_tensor_to_delta_action_dict")
|
||||
@dataclass
|
||||
class MapTensorToDeltaActionDict(ActionProcessor):
|
||||
"""
|
||||
Map a tensor to a delta action dictionary.
|
||||
"""
|
||||
|
||||
def action(self, action: Tensor) -> dict:
|
||||
if isinstance(action, dict):
|
||||
return action
|
||||
if action.dim() > 1:
|
||||
action = action.squeeze(0)
|
||||
|
||||
# TODO (maractingi): add rotation
|
||||
delta_action = {
|
||||
"action.delta_x": action[0],
|
||||
"action.delta_y": action[1],
|
||||
"action.delta_z": action[2],
|
||||
}
|
||||
if action.shape[0] > 3:
|
||||
delta_action["action.gripper"] = action[3]
|
||||
return delta_action
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("map_delta_action_to_robot_action")
|
||||
@dataclass
|
||||
class MapDeltaActionToRobotAction(ActionProcessor):
|
||||
"""
|
||||
Map delta actions from teleoperators (gamepad, keyboard) to robot target actions
|
||||
for use with inverse kinematics processors.
|
||||
|
||||
Expected input ACTION keys:
|
||||
{
|
||||
"action.delta_x": float,
|
||||
"action.delta_y": float,
|
||||
"action.delta_z": float,
|
||||
"action.gripper": float (optional),
|
||||
}
|
||||
|
||||
Output ACTION keys:
|
||||
{
|
||||
"action.enabled": bool,
|
||||
"action.target_x": float,
|
||||
"action.target_y": float,
|
||||
"action.target_z": float,
|
||||
"action.target_wx": float,
|
||||
"action.target_wy": float,
|
||||
"action.target_wz": float,
|
||||
"action.gripper": float,
|
||||
}
|
||||
"""
|
||||
|
||||
# Scale factors for delta movements
|
||||
position_scale: float = 1.0
|
||||
rotation_scale: float = 0.0 # No rotation deltas for gamepad/keyboard
|
||||
|
||||
def action(self, action: dict | None) -> dict:
|
||||
if action is None:
|
||||
return {}
|
||||
|
||||
# NOTE (maractingi): Action can be a dict from the teleop_devices or a tensor from the policy
|
||||
# TODO (maractingi): changing this target_xyz naming convention from the teleop_devices
|
||||
delta_x = action.pop("action.delta_x", 0.0)
|
||||
delta_y = action.pop("action.delta_y", 0.0)
|
||||
delta_z = action.pop("action.delta_z", 0.0)
|
||||
gripper = action.pop("action.gripper", 1.0) # Default to "stay" (1.0)
|
||||
|
||||
# Determine if the teleoperator is actively providing input
|
||||
# Consider enabled if any significant movement delta is detected
|
||||
position_magnitude = abs(delta_x) + abs(delta_y) + abs(delta_z)
|
||||
enabled = position_magnitude > 1e-6 # Small threshold to avoid noise
|
||||
|
||||
# Scale the deltas appropriately
|
||||
scaled_delta_x = float(delta_x) * self.position_scale
|
||||
scaled_delta_y = float(delta_y) * self.position_scale
|
||||
scaled_delta_z = float(delta_z) * self.position_scale
|
||||
|
||||
# For gamepad/keyboard, we don't have rotation input, so set to 0
|
||||
# These could be extended in the future for more sophisticated teleoperators
|
||||
target_wx = 0.0
|
||||
target_wy = 0.0
|
||||
target_wz = 0.0
|
||||
|
||||
# Update action with robot target format
|
||||
action = {
|
||||
"action.enabled": enabled,
|
||||
"action.target_x": scaled_delta_x,
|
||||
"action.target_y": scaled_delta_y,
|
||||
"action.target_z": scaled_delta_z,
|
||||
"action.target_wx": target_wx,
|
||||
"action.target_wy": target_wy,
|
||||
"action.target_wz": target_wz,
|
||||
"action.gripper": float(gripper),
|
||||
}
|
||||
|
||||
return action
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
"""Transform features to match output format."""
|
||||
# Update features to reflect the new action format
|
||||
features.update(
|
||||
{
|
||||
"action.enabled": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_x": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_y": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_z": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_wx": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_wy": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.target_wz": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
"action.gripper": PolicyFeature(type=FeatureType.ACTION, shape=(1,)),
|
||||
}
|
||||
)
|
||||
return features
|
||||
@@ -66,9 +66,26 @@ class DeviceProcessor:
|
||||
self._target_float_dtype = None
|
||||
|
||||
def _process_tensor(self, tensor: torch.Tensor) -> torch.Tensor:
|
||||
"""Process a tensor by moving to device and optionally converting float dtype."""
|
||||
# Move to device first
|
||||
tensor = tensor.to(self.device, non_blocking=self.non_blocking)
|
||||
"""Process a tensor by moving to device and optionally converting float dtype.
|
||||
|
||||
If the tensor is already on a GPU and we're configured for a GPU, it preserves
|
||||
that GPU placement (useful for multi-GPU training with Accelerate).
|
||||
Otherwise, it moves to the configured device.
|
||||
"""
|
||||
# Determine target device
|
||||
if tensor.is_cuda and self._device.type == "cuda":
|
||||
# Both tensor and target are on GPU - preserve tensor's GPU placement
|
||||
# This handles multi-GPU scenarios where Accelerate has already placed
|
||||
# tensors on the correct GPU for each process
|
||||
target_device = tensor.device
|
||||
else:
|
||||
# Either tensor is on CPU, or we're configured for CPU
|
||||
# In both cases, use the configured device
|
||||
target_device = self._device
|
||||
|
||||
# Only move if necessary
|
||||
if tensor.device != target_device:
|
||||
tensor = tensor.to(target_device, non_blocking=self.non_blocking)
|
||||
|
||||
# Convert float dtype if specified and tensor is floating point
|
||||
if self._target_float_dtype is not None and tensor.is_floating_point():
|
||||
|
||||
68
src/lerobot/processor/gym_action_processor.py
Normal file
68
src/lerobot/processor/gym_action_processor.py
Normal file
@@ -0,0 +1,68 @@
|
||||
#! /usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.processor.pipeline import ActionProcessor, ProcessorStepRegistry
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("torch2numpy_action_processor")
|
||||
@dataclass
|
||||
class Torch2NumpyActionProcessor(ActionProcessor):
|
||||
"""Convert PyTorch tensor actions to NumPy arrays."""
|
||||
|
||||
squeeze_batch_dim: bool = True
|
||||
|
||||
def action(self, action: torch.Tensor | None) -> np.ndarray | None:
|
||||
if action is None:
|
||||
return None
|
||||
|
||||
if not isinstance(action, torch.Tensor):
|
||||
raise TypeError(
|
||||
f"Expected torch.Tensor or None, got {type(action).__name__}. "
|
||||
"Use appropriate processor for non-tensor actions."
|
||||
)
|
||||
|
||||
numpy_action = action.detach().cpu().numpy()
|
||||
|
||||
# Remove batch dimensions but preserve action dimensions
|
||||
# Only squeeze if there's a batch dimension (first dim == 1)
|
||||
if (
|
||||
self.squeeze_batch_dim
|
||||
and numpy_action.shape
|
||||
and len(numpy_action.shape) > 1
|
||||
and numpy_action.shape[0] == 1
|
||||
):
|
||||
numpy_action = numpy_action.squeeze(0)
|
||||
|
||||
return numpy_action
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("numpy2torch_action_processor")
|
||||
@dataclass
|
||||
class Numpy2TorchActionProcessor(ActionProcessor):
|
||||
"""Convert NumPy array action to PyTorch tensor."""
|
||||
|
||||
def action(self, action: np.ndarray | None) -> torch.Tensor | None:
|
||||
if action is None:
|
||||
return None
|
||||
if not isinstance(action, np.ndarray):
|
||||
raise TypeError(
|
||||
f"Expected np.ndarray or None, got {type(action).__name__}. "
|
||||
"Use appropriate processor for non-tensor actions."
|
||||
)
|
||||
torch_action = torch.from_numpy(action)
|
||||
return torch_action
|
||||
369
src/lerobot/processor/hil_processor.py
Normal file
369
src/lerobot/processor/hil_processor.py
Normal file
@@ -0,0 +1,369 @@
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torchvision.transforms.functional as F # noqa: N812
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.processor.pipeline import (
|
||||
ComplementaryDataProcessor,
|
||||
EnvTransition,
|
||||
InfoProcessor,
|
||||
ObservationProcessor,
|
||||
ProcessorStepRegistry,
|
||||
TransitionKey,
|
||||
)
|
||||
from lerobot.teleoperators.teleoperator import Teleoperator
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
|
||||
GRIPPER_KEY = "gripper"
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("add_teleop_action_as_complementary_data")
|
||||
@dataclass
|
||||
class AddTeleopActionAsComplimentaryData(ComplementaryDataProcessor):
|
||||
"""Add teleoperator action to transition complementary data."""
|
||||
|
||||
teleop_device: Teleoperator
|
||||
|
||||
def complementary_data(self, complementary_data: dict | None) -> dict:
|
||||
complementary_data = {} if complementary_data is None else dict(complementary_data)
|
||||
complementary_data["teleop_action"] = self.teleop_device.get_action()
|
||||
return complementary_data
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("add_teleop_action_as_info")
|
||||
@dataclass
|
||||
class AddTeleopEventsAsInfo(InfoProcessor):
|
||||
"""Add teleoperator control events to transition info."""
|
||||
|
||||
teleop_device: Teleoperator
|
||||
|
||||
def info(self, info: dict | None) -> dict:
|
||||
info = {} if info is None else dict(info)
|
||||
teleop_events = getattr(self.teleop_device, "get_teleop_events", lambda: {})()
|
||||
info.update(teleop_events)
|
||||
return info
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("image_crop_resize_processor")
|
||||
@dataclass
|
||||
class ImageCropResizeProcessor(ObservationProcessor):
|
||||
"""Crop and resize image observations."""
|
||||
|
||||
crop_params_dict: dict[str, tuple[int, int, int, int]] | None = None
|
||||
resize_size: tuple[int, int] | None = None
|
||||
|
||||
def observation(self, observation: dict | None) -> dict | None:
|
||||
if observation is None:
|
||||
return None
|
||||
|
||||
if self.resize_size is None and not self.crop_params_dict:
|
||||
return observation
|
||||
|
||||
new_observation = dict(observation)
|
||||
|
||||
# Process all image keys in the observation
|
||||
for key in observation:
|
||||
if "image" not in key:
|
||||
continue
|
||||
|
||||
image = observation[key]
|
||||
device = image.device
|
||||
# NOTE (maractingi): No mps kernel for crop and resize, so we need to move to cpu
|
||||
if device.type == "mps":
|
||||
image = image.cpu()
|
||||
# Crop if crop params are provided for this key
|
||||
if self.crop_params_dict is not None and key in self.crop_params_dict:
|
||||
crop_params = self.crop_params_dict[key]
|
||||
image = F.crop(image, *crop_params)
|
||||
if self.resize_size is not None:
|
||||
image = F.resize(image, self.resize_size)
|
||||
image = image.clamp(0.0, 1.0)
|
||||
new_observation[key] = image.to(device)
|
||||
|
||||
return new_observation
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"crop_params_dict": self.crop_params_dict,
|
||||
"resize_size": self.resize_size,
|
||||
}
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
if self.resize_size is None:
|
||||
return features
|
||||
for key in features:
|
||||
if "image" in key:
|
||||
features[key] = PolicyFeature(type=features[key].type, shape=self.resize_size)
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("time_limit_processor")
|
||||
class TimeLimitProcessor:
|
||||
"""Track episode steps and enforce time limits."""
|
||||
|
||||
max_episode_steps: int
|
||||
current_step: int = 0
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
truncated = transition.get(TransitionKey.TRUNCATED)
|
||||
if truncated is None:
|
||||
return transition
|
||||
|
||||
self.current_step += 1
|
||||
if self.current_step >= self.max_episode_steps:
|
||||
truncated = True
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.TRUNCATED] = truncated
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"max_episode_steps": self.max_episode_steps,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
self.current_step = 0
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("gripper_penalty_processor")
|
||||
class GripperPenaltyProcessor:
|
||||
"""Apply penalty for inappropriate gripper usage."""
|
||||
|
||||
penalty: float = -0.01
|
||||
max_gripper_pos: float = 30.0
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
"""Calculate gripper penalty and add to complementary data."""
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
|
||||
|
||||
if complementary_data is None or action is None:
|
||||
return transition
|
||||
|
||||
current_gripper_pos = complementary_data.get("raw_joint_positions", None).get(GRIPPER_KEY, None)
|
||||
if current_gripper_pos is None:
|
||||
return transition
|
||||
|
||||
gripper_action = action[f"action.{GRIPPER_KEY}.pos"]
|
||||
gripper_action_normalized = gripper_action / self.max_gripper_pos
|
||||
|
||||
# Normalize gripper state and action
|
||||
gripper_state_normalized = current_gripper_pos / self.max_gripper_pos
|
||||
|
||||
# Calculate penalty boolean as in original
|
||||
gripper_penalty_bool = (gripper_state_normalized < 0.5 and gripper_action_normalized > 0.5) or (
|
||||
gripper_state_normalized > 0.75 and gripper_action_normalized < 0.5
|
||||
)
|
||||
|
||||
gripper_penalty = self.penalty * int(gripper_penalty_bool)
|
||||
|
||||
# Add penalty information to complementary data
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
|
||||
# Create new complementary data with penalty info
|
||||
new_complementary_data = dict(complementary_data)
|
||||
new_complementary_data["discrete_penalty"] = gripper_penalty
|
||||
|
||||
# Create new transition with updated complementary data
|
||||
new_transition = transition.copy()
|
||||
existing_comp_data = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
existing_comp_data.update(new_complementary_data)
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA] = existing_comp_data # type: ignore[misc]
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"penalty": self.penalty,
|
||||
"max_gripper_pos": self.max_gripper_pos,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset the processor state."""
|
||||
self.last_gripper_state = None
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("intervention_action_processor")
|
||||
class InterventionActionProcessor:
|
||||
"""Handle human intervention actions and episode termination."""
|
||||
|
||||
use_gripper: bool = False
|
||||
terminate_on_success: bool = True
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
if action is None:
|
||||
return transition
|
||||
|
||||
# Get intervention signals from complementary data
|
||||
info = transition.get(TransitionKey.INFO, {})
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
teleop_action = complementary_data.get("teleop_action", {})
|
||||
is_intervention = info.get(TeleopEvents.IS_INTERVENTION, False)
|
||||
terminate_episode = info.get(TeleopEvents.TERMINATE_EPISODE, False)
|
||||
success = info.get(TeleopEvents.SUCCESS, False)
|
||||
rerecord_episode = info.get(TeleopEvents.RERECORD_EPISODE, False)
|
||||
|
||||
new_transition = transition.copy()
|
||||
|
||||
# Override action if intervention is active
|
||||
if is_intervention and teleop_action is not None:
|
||||
if isinstance(teleop_action, dict):
|
||||
# Convert teleop_action dict to tensor format
|
||||
action_list = [
|
||||
teleop_action.get("action.delta_x", 0.0),
|
||||
teleop_action.get("action.delta_y", 0.0),
|
||||
teleop_action.get("action.delta_z", 0.0),
|
||||
]
|
||||
if self.use_gripper:
|
||||
action_list.append(teleop_action.get("gripper", 1.0))
|
||||
elif isinstance(teleop_action, np.ndarray):
|
||||
action_list = teleop_action.tolist()
|
||||
else:
|
||||
action_list = teleop_action
|
||||
|
||||
teleop_action_tensor = torch.tensor(action_list, dtype=action.dtype, device=action.device)
|
||||
new_transition[TransitionKey.ACTION] = teleop_action_tensor
|
||||
|
||||
# Handle episode termination
|
||||
new_transition[TransitionKey.DONE] = bool(terminate_episode) or (
|
||||
self.terminate_on_success and success
|
||||
)
|
||||
new_transition[TransitionKey.REWARD] = float(success)
|
||||
|
||||
# Update info with intervention metadata
|
||||
info = new_transition.get(TransitionKey.INFO, {})
|
||||
info[TeleopEvents.IS_INTERVENTION] = is_intervention
|
||||
info[TeleopEvents.RERECORD_EPISODE] = rerecord_episode
|
||||
info[TeleopEvents.SUCCESS] = success
|
||||
new_transition[TransitionKey.INFO] = info
|
||||
|
||||
# Update complementary data with teleop action
|
||||
complementary_data = new_transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
complementary_data["teleop_action"] = new_transition.get(TransitionKey.ACTION)
|
||||
new_transition[TransitionKey.COMPLEMENTARY_DATA] = complementary_data
|
||||
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"use_gripper": self.use_gripper,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("reward_classifier_processor")
|
||||
class RewardClassifierProcessor:
|
||||
"""Apply reward classification to image observations."""
|
||||
|
||||
pretrained_path: str | None = None
|
||||
device: str = "cpu"
|
||||
success_threshold: float = 0.5
|
||||
success_reward: float = 1.0
|
||||
terminate_on_success: bool = True
|
||||
|
||||
reward_classifier: Any = None
|
||||
|
||||
def __post_init__(self):
|
||||
"""Initialize the reward classifier after dataclass initialization."""
|
||||
if self.pretrained_path is not None:
|
||||
from lerobot.policies.sac.reward_model.modeling_classifier import Classifier
|
||||
|
||||
self.reward_classifier = Classifier.from_pretrained(self.pretrained_path)
|
||||
self.reward_classifier.to(self.device)
|
||||
self.reward_classifier.eval()
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None or self.reward_classifier is None:
|
||||
return transition
|
||||
|
||||
# Extract images from observation
|
||||
images = {key: value for key, value in observation.items() if "image" in key}
|
||||
|
||||
if not images:
|
||||
return transition
|
||||
|
||||
# Run reward classifier
|
||||
start_time = time.perf_counter()
|
||||
with torch.inference_mode():
|
||||
success = self.reward_classifier.predict_reward(images, threshold=self.success_threshold)
|
||||
|
||||
classifier_frequency = 1 / (time.perf_counter() - start_time)
|
||||
|
||||
# Calculate reward and termination
|
||||
reward = transition.get(TransitionKey.REWARD, 0.0)
|
||||
terminated = transition.get(TransitionKey.DONE, False)
|
||||
|
||||
if success == 1.0:
|
||||
reward = self.success_reward
|
||||
if self.terminate_on_success:
|
||||
terminated = True
|
||||
|
||||
# Update transition
|
||||
new_transition = transition.copy()
|
||||
new_transition[TransitionKey.REWARD] = reward
|
||||
new_transition[TransitionKey.DONE] = terminated
|
||||
|
||||
# Update info with classifier frequency
|
||||
info = new_transition.get(TransitionKey.INFO, {})
|
||||
info["reward_classifier_frequency"] = classifier_frequency
|
||||
new_transition[TransitionKey.INFO] = info
|
||||
|
||||
return new_transition
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"device": self.device,
|
||||
"success_threshold": self.success_threshold,
|
||||
"success_reward": self.success_reward,
|
||||
"terminate_on_success": self.terminate_on_success,
|
||||
}
|
||||
|
||||
def state_dict(self) -> dict[str, torch.Tensor]:
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state: dict[str, torch.Tensor]) -> None:
|
||||
pass
|
||||
|
||||
def reset(self) -> None:
|
||||
pass
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
116
src/lerobot/processor/joint_observations_processor.py
Normal file
116
src/lerobot/processor/joint_observations_processor.py
Normal file
@@ -0,0 +1,116 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
from lerobot.processor.pipeline import (
|
||||
ObservationProcessor,
|
||||
ProcessorStepRegistry,
|
||||
)
|
||||
from lerobot.robots import Robot
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("joint_velocity_processor")
|
||||
class JointVelocityProcessor(ObservationProcessor):
|
||||
"""Add joint velocity information to observations."""
|
||||
|
||||
joint_velocity_limits: float = 100.0
|
||||
dt: float = 1.0 / 10
|
||||
num_dof: int | None = None
|
||||
|
||||
last_joint_positions: torch.Tensor | None = None
|
||||
|
||||
def observation(self, observation: dict | None) -> dict | None:
|
||||
if observation is None:
|
||||
return None
|
||||
|
||||
# Get current joint positions (assuming they're in observation.state)
|
||||
current_positions = observation.get("observation.state")
|
||||
if current_positions is None:
|
||||
return observation
|
||||
|
||||
# Initialize last joint positions if not already set
|
||||
if self.last_joint_positions is None:
|
||||
self.last_joint_positions = current_positions.clone()
|
||||
|
||||
# Compute velocities
|
||||
joint_velocities = (current_positions - self.last_joint_positions) / self.dt
|
||||
self.last_joint_positions = current_positions.clone()
|
||||
|
||||
# Extend observation with velocities
|
||||
extended_state = torch.cat([current_positions, joint_velocities], dim=-1)
|
||||
|
||||
# Create new observation dict
|
||||
new_observation = dict(observation)
|
||||
new_observation["observation.state"] = extended_state
|
||||
|
||||
return new_observation
|
||||
|
||||
def get_config(self) -> dict[str, Any]:
|
||||
return {
|
||||
"joint_velocity_limits": self.joint_velocity_limits,
|
||||
"dt": self.dt,
|
||||
}
|
||||
|
||||
def reset(self) -> None:
|
||||
self.last_joint_positions = None
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
if "observation.state" in features and self.num_dof is not None:
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
|
||||
original_feature = features["observation.state"]
|
||||
# Double the shape to account for positions + velocities
|
||||
new_shape = (original_feature.shape[0] + self.num_dof,) + original_feature.shape[1:]
|
||||
features["observation.state"] = PolicyFeature(type=original_feature.type, shape=new_shape)
|
||||
return features
|
||||
|
||||
|
||||
@dataclass
|
||||
@ProcessorStepRegistry.register("current_processor")
|
||||
class MotorCurrentProcessor(ObservationProcessor):
|
||||
"""Add motor current information to observations."""
|
||||
|
||||
robot: Robot | None = None
|
||||
|
||||
def observation(self, observation: dict | None) -> dict | None:
|
||||
if observation is None:
|
||||
return None
|
||||
|
||||
# Get current values from robot state
|
||||
if self.robot is None:
|
||||
return observation
|
||||
present_current_dict = self.robot.bus.sync_read("Present_Current") # type: ignore[attr-defined]
|
||||
motor_currents = torch.tensor(
|
||||
[present_current_dict[name] for name in self.robot.bus.motors], # type: ignore[attr-defined]
|
||||
dtype=torch.float32,
|
||||
).unsqueeze(0)
|
||||
|
||||
current_state = observation.get("observation.state")
|
||||
if current_state is None:
|
||||
return observation
|
||||
|
||||
extended_state = torch.cat([current_state, motor_currents], dim=-1)
|
||||
|
||||
# Create new observation dict
|
||||
new_observation = dict(observation)
|
||||
new_observation["observation.state"] = extended_state
|
||||
|
||||
return new_observation
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
if "observation.state" in features and self.robot is not None:
|
||||
from lerobot.configs.types import PolicyFeature
|
||||
|
||||
original_feature = features["observation.state"]
|
||||
# Add motor current dimensions to the original state shape
|
||||
num_motors = 0
|
||||
if hasattr(self.robot, "bus") and hasattr(self.robot.bus, "motors"): # type: ignore[attr-defined]
|
||||
num_motors = len(self.robot.bus.motors) # type: ignore[attr-defined]
|
||||
|
||||
if num_motors > 0:
|
||||
new_shape = (original_feature.shape[0] + num_motors,) + original_feature.shape[1:]
|
||||
features["observation.state"] = PolicyFeature(type=original_feature.type, shape=new_shape)
|
||||
return features
|
||||
120
src/lerobot/processor/leader_follower_processor.py
Normal file
120
src/lerobot/processor/leader_follower_processor.py
Normal file
@@ -0,0 +1,120 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
from lerobot.model.kinematics import RobotKinematics
|
||||
from lerobot.processor.pipeline import EnvTransition, ProcessorStepRegistry, TransitionKey
|
||||
from lerobot.robots import Robot
|
||||
from lerobot.teleoperators import Teleoperator
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("leader_follower_processor")
|
||||
@dataclass
|
||||
class LeaderFollowerProcessor:
|
||||
"""
|
||||
Processor for leader-follower teleoperation mode.
|
||||
|
||||
This processor:
|
||||
1. Sends follower positions to leader arm when not intervening
|
||||
2. Computes EE delta actions from leader when intervening
|
||||
3. Handles teleop events from the leader device
|
||||
"""
|
||||
|
||||
leader_device: Teleoperator
|
||||
motor_names: list[str]
|
||||
robot: Robot
|
||||
kinematics: RobotKinematics
|
||||
end_effector_step_sizes: np.ndarray | None = None
|
||||
use_gripper: bool = True
|
||||
prev_leader_gripper: float | None = None
|
||||
max_gripper_pos: float = 100.0
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
"""Process transition with leader-follower logic."""
|
||||
# Get current follower position from complementary data
|
||||
raw_joint_pos = transition.get(TransitionKey.COMPLEMENTARY_DATA, {}).get("raw_joint_positions")
|
||||
if raw_joint_pos is not None:
|
||||
# Send follower position to leader (for follow mode)
|
||||
follower_action = {f"{motor}.pos": float(raw_joint_pos[motor]) for motor in self.motor_names}
|
||||
self.leader_device.send_action(follower_action)
|
||||
|
||||
# Only compute EE action if intervention is active
|
||||
# (AddTeleopEventsAsInfo already added IS_INTERVENTION to info)
|
||||
info = transition.get(TransitionKey.INFO, {})
|
||||
if info.get(TeleopEvents.IS_INTERVENTION, False):
|
||||
# Get leader joint positions from teleop_action
|
||||
# (AddTeleopActionAsComplimentaryData already got the action)
|
||||
complementary = transition.get(TransitionKey.COMPLEMENTARY_DATA, {})
|
||||
teleop_action = complementary.get("teleop_action", {})
|
||||
|
||||
if isinstance(teleop_action, dict) and raw_joint_pos is not None:
|
||||
# Extract leader positions from teleop action dict
|
||||
leader_pos = np.array([teleop_action.get(f"{motor}.pos", 0) for motor in self.motor_names])
|
||||
follower_pos = np.array([raw_joint_pos[motor] for motor in self.motor_names])
|
||||
|
||||
# Compute EE positions
|
||||
leader_ee = self.kinematics.forward_kinematics(leader_pos)[:3, 3]
|
||||
follower_ee = self.kinematics.forward_kinematics(follower_pos)[:3, 3]
|
||||
|
||||
# Compute normalized EE delta
|
||||
if self.end_effector_step_sizes is not None:
|
||||
ee_delta = np.clip(
|
||||
leader_ee - follower_ee, -self.end_effector_step_sizes, self.end_effector_step_sizes
|
||||
)
|
||||
ee_delta_normalized = ee_delta / self.end_effector_step_sizes
|
||||
else:
|
||||
ee_delta_normalized = leader_ee - follower_ee
|
||||
|
||||
# Handle gripper
|
||||
if self.use_gripper and len(leader_pos) > 3:
|
||||
if self.prev_leader_gripper is None:
|
||||
self.prev_leader_gripper = np.clip(leader_pos[-1], 0, self.max_gripper_pos)
|
||||
|
||||
leader_gripper = leader_pos[-1]
|
||||
gripper_delta = leader_gripper - self.prev_leader_gripper
|
||||
normalized_delta = gripper_delta / self.max_gripper_pos
|
||||
|
||||
# Quantize gripper action
|
||||
if normalized_delta >= 0.3:
|
||||
gripper_action = 2
|
||||
elif normalized_delta <= -0.1:
|
||||
gripper_action = 0
|
||||
else:
|
||||
gripper_action = 1
|
||||
|
||||
self.prev_leader_gripper = leader_gripper
|
||||
|
||||
# Create intervention action
|
||||
intervention_action = np.append(ee_delta_normalized, gripper_action)
|
||||
else:
|
||||
intervention_action = ee_delta_normalized
|
||||
|
||||
# Override teleop_action with computed EE action
|
||||
complementary["teleop_action"] = torch.from_numpy(intervention_action).float()
|
||||
transition[TransitionKey.COMPLEMENTARY_DATA] = complementary # type: ignore[misc]
|
||||
|
||||
return transition
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset leader-follower state."""
|
||||
self.prev_leader_gripper = None
|
||||
if hasattr(self.leader_device, "reset"):
|
||||
self.leader_device.reset()
|
||||
@@ -134,9 +134,19 @@ class TokenizerProcessor:
|
||||
if task is None:
|
||||
return transition
|
||||
|
||||
# Tokenize the task
|
||||
# Tokenize the task (creates CPU tensors)
|
||||
tokenized_prompt = self._tokenize_text(task)
|
||||
|
||||
# Detect device from existing tensors in the transition
|
||||
target_device = self._detect_device(transition)
|
||||
|
||||
# Move tokenized tensors to match the device of other data
|
||||
if target_device is not None:
|
||||
tokenized_prompt = {
|
||||
k: v.to(target_device) if isinstance(v, torch.Tensor) else v
|
||||
for k, v in tokenized_prompt.items()
|
||||
}
|
||||
|
||||
# Get or create observation dict
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation is None:
|
||||
@@ -153,6 +163,45 @@ class TokenizerProcessor:
|
||||
transition[TransitionKey.OBSERVATION.value] = observation # type: ignore[misc]
|
||||
return transition
|
||||
|
||||
def _detect_device(self, transition: EnvTransition) -> torch.device | None:
|
||||
"""Detect device from existing tensors in the transition.
|
||||
|
||||
This allows the tokenized tensors to match the device of other data,
|
||||
which is especially important for multi-GPU training with Accelerate.
|
||||
|
||||
Args:
|
||||
transition: The transition to search for existing tensors.
|
||||
|
||||
Returns:
|
||||
The device of the first tensor found, or None if no tensors exist.
|
||||
"""
|
||||
# Check observation tensors first (most likely to exist)
|
||||
observation = transition.get(TransitionKey.OBSERVATION)
|
||||
if observation:
|
||||
for value in observation.values():
|
||||
if isinstance(value, torch.Tensor):
|
||||
return value.device
|
||||
|
||||
# Check action tensor
|
||||
action = transition.get(TransitionKey.ACTION)
|
||||
if isinstance(action, torch.Tensor):
|
||||
return action.device
|
||||
|
||||
# Check other tensor fields
|
||||
for key in [TransitionKey.REWARD, TransitionKey.DONE, TransitionKey.TRUNCATED]:
|
||||
value = transition.get(key)
|
||||
if isinstance(value, torch.Tensor):
|
||||
return value.device
|
||||
|
||||
# Check complementary data for tensors
|
||||
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
|
||||
if complementary_data:
|
||||
for value in complementary_data.values():
|
||||
if isinstance(value, torch.Tensor):
|
||||
return value.device
|
||||
|
||||
return None # No tensors found, keep on CPU
|
||||
|
||||
def _tokenize_text(self, text: str | list[str]) -> dict[str, torch.Tensor]:
|
||||
"""Tokenize text using the configured tokenizer.
|
||||
|
||||
|
||||
@@ -45,9 +45,11 @@ from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
|
||||
import draccus
|
||||
|
||||
from lerobot.configs import parser
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset
|
||||
from lerobot.processor import RobotProcessor
|
||||
from lerobot.processor.converters import to_output_robot_action, to_transition_teleop_action
|
||||
from lerobot.processor.pipeline import IdentityProcessor
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
Robot,
|
||||
RobotConfig,
|
||||
@@ -83,13 +85,25 @@ class ReplayConfig:
|
||||
dataset: DatasetReplayConfig
|
||||
# Use vocal synthesis to read events.
|
||||
play_sounds: bool = True
|
||||
# Optional processor for actions before sending to robot
|
||||
robot_action_processor: RobotProcessor | None = None
|
||||
|
||||
|
||||
@draccus.wrap()
|
||||
@parser.wrap()
|
||||
def replay(cfg: ReplayConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
|
||||
# Initialize robot action processor with default if not provided
|
||||
robot_action_processor = cfg.robot_action_processor or RobotProcessor(
|
||||
steps=[IdentityProcessor()],
|
||||
to_transition=lambda tr: tr,
|
||||
to_output=to_output_robot_action, # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
# Reset processor
|
||||
robot_action_processor.reset()
|
||||
|
||||
robot = make_robot_from_config(cfg.robot)
|
||||
dataset = LeRobotDataset(cfg.dataset.repo_id, root=cfg.dataset.root, episodes=[cfg.dataset.episode])
|
||||
actions = dataset.hf_dataset.select_columns("action")
|
||||
@@ -104,7 +118,12 @@ def replay(cfg: ReplayConfig):
|
||||
for i, name in enumerate(dataset.features["action"]["names"]):
|
||||
action[name] = action_array[i]
|
||||
|
||||
robot.send_action(action)
|
||||
# Process action through robot action processor
|
||||
# Note: We need to convert the action dict to a transition format first
|
||||
action_transition = to_transition_teleop_action(action)
|
||||
processed_action = robot_action_processor(action_transition)
|
||||
|
||||
robot.send_action(processed_action) # type: ignore[arg-type]
|
||||
|
||||
dt_s = time.perf_counter() - start_episode_t
|
||||
busy_wait(1 / dataset.fps - dt_s)
|
||||
|
||||
@@ -53,6 +53,9 @@ class EEReferenceAndDelta:
|
||||
kinematics: RobotKinematics
|
||||
end_effector_step_sizes: dict
|
||||
motor_names: list[str]
|
||||
use_latched_reference: bool = (
|
||||
True # If True, latch reference on enable; if False, always use current pose
|
||||
)
|
||||
|
||||
reference_ee_pose: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
_prev_enabled: bool = field(default=False, init=False, repr=False)
|
||||
@@ -69,7 +72,10 @@ class EEReferenceAndDelta:
|
||||
"raw_joint_positions is not in complementary data and is required for EEReferenceAndDelta"
|
||||
)
|
||||
|
||||
q = np.array([float(raw[n]) for n in self.motor_names], dtype=float)
|
||||
if "reference_joint_positions" in comp:
|
||||
q = comp["reference_joint_positions"]
|
||||
else:
|
||||
q = np.array([float(raw[n]) for n in self.motor_names], dtype=float)
|
||||
|
||||
# Current pose from FK on measured joints
|
||||
t_curr = self.kinematics.forward_kinematics(q)
|
||||
@@ -85,11 +91,12 @@ class EEReferenceAndDelta:
|
||||
desired = None
|
||||
|
||||
if enabled:
|
||||
# Latch a reference at the rising edge; also be defensive if None
|
||||
if not self._prev_enabled or self.reference_ee_pose is None:
|
||||
self.reference_ee_pose = t_curr.copy()
|
||||
|
||||
ref = self.reference_ee_pose if self.reference_ee_pose is not None else t_curr
|
||||
ref = t_curr
|
||||
if self.use_latched_reference:
|
||||
# Latched reference mode: latch reference at the rising edge
|
||||
if not self._prev_enabled or self.reference_ee_pose is None:
|
||||
self.reference_ee_pose = t_curr.copy()
|
||||
ref = self.reference_ee_pose if self.reference_ee_pose is not None else t_curr
|
||||
|
||||
delta_p = np.array(
|
||||
[
|
||||
@@ -100,7 +107,6 @@ class EEReferenceAndDelta:
|
||||
dtype=float,
|
||||
)
|
||||
r_abs = Rotation.from_rotvec([wx, wy, wz]).as_matrix()
|
||||
|
||||
desired = np.eye(4, dtype=float)
|
||||
desired[:3, :3] = ref[:3, :3] @ r_abs
|
||||
desired[:3, 3] = ref[:3, 3] + delta_p
|
||||
@@ -134,6 +140,11 @@ class EEReferenceAndDelta:
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
return features
|
||||
|
||||
def reset(self):
|
||||
self._prev_enabled = False
|
||||
self.reference_ee_pose = None
|
||||
self._command_when_disabled = None
|
||||
|
||||
|
||||
@ProcessorStepRegistry.register("ee_bounds_and_safety")
|
||||
@dataclass
|
||||
@@ -156,6 +167,7 @@ class EEBoundsAndSafety(ActionProcessor):
|
||||
max_ee_step_m: float = 0.05
|
||||
max_ee_twist_step_rad: float = 0.20
|
||||
_last_pos: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
_last_twist: np.ndarray | None = field(default=None, init=False, repr=False)
|
||||
|
||||
def action(self, act: dict | None) -> dict:
|
||||
x = act.pop("action.ee.x", None)
|
||||
@@ -199,6 +211,7 @@ class EEBoundsAndSafety(ActionProcessor):
|
||||
|
||||
def reset(self):
|
||||
self._last_pos = None
|
||||
self._last_twist = None
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
# Because this is last step we specify the dataset features of this step that we want to be stored in the dataset
|
||||
@@ -292,6 +305,8 @@ class InverseKinematicsEEToJoints:
|
||||
else:
|
||||
new_act[f"action.{name}.pos"] = float(q_target[i])
|
||||
transition[TransitionKey.ACTION] = new_act
|
||||
if not self.initial_guess_current_joints:
|
||||
transition[TransitionKey.COMPLEMENTARY_DATA]["reference_joint_positions"] = q_target
|
||||
return transition
|
||||
|
||||
def transform_features(self, features: dict[str, PolicyFeature]) -> dict[str, PolicyFeature]:
|
||||
@@ -332,6 +347,7 @@ class GripperVelocityToJoint:
|
||||
speed_factor: float = 20.0
|
||||
clip_min: float = 0.0
|
||||
clip_max: float = 100.0
|
||||
discrete_gripper: bool = False
|
||||
|
||||
def __call__(self, transition: EnvTransition) -> EnvTransition:
|
||||
obs = transition.get(TransitionKey.OBSERVATION) or {}
|
||||
@@ -347,6 +363,15 @@ class GripperVelocityToJoint:
|
||||
transition[TransitionKey.ACTION] = new_act
|
||||
return transition
|
||||
|
||||
if self.discrete_gripper:
|
||||
# Discrete gripper actions are in [0, 1, 2]
|
||||
# 0: open, 1: close, 2: stay
|
||||
# We need to shift them to [-1, 0, 1] and then scale them to clip_max
|
||||
gripper_action = act.get("action.gripper", 1.0)
|
||||
gripper_action = gripper_action - 1.0
|
||||
gripper_action *= self.clip_max
|
||||
act["action.gripper"] = gripper_action
|
||||
|
||||
# Get current gripper position from complementary data
|
||||
raw = comp.get("raw_joint_positions") or {}
|
||||
curr_pos = float(raw.get("gripper"))
|
||||
|
||||
@@ -62,9 +62,16 @@ from lerobot.configs import parser
|
||||
from lerobot.configs.train import TrainRLServerPipelineConfig
|
||||
from lerobot.policies.factory import make_policy
|
||||
from lerobot.policies.sac.modeling_sac import SACPolicy
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
from lerobot.robots import so100_follower # noqa: F401
|
||||
from lerobot.scripts.rl.gym_manipulator import make_robot_env
|
||||
from lerobot.scripts.rl.gym_manipulator import (
|
||||
create_transition,
|
||||
make_processors,
|
||||
make_robot_env,
|
||||
step_env_and_process_transition,
|
||||
)
|
||||
from lerobot.teleoperators import gamepad, so101_leader # noqa: F401
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.transport import services_pb2, services_pb2_grpc
|
||||
from lerobot.transport.utils import (
|
||||
bytes_to_state_dict,
|
||||
@@ -91,7 +98,6 @@ from lerobot.utils.utils import (
|
||||
|
||||
ACTOR_SHUTDOWN_TIMEOUT = 30
|
||||
|
||||
|
||||
#################################################
|
||||
# Main entry point #
|
||||
#################################################
|
||||
@@ -236,7 +242,8 @@ def act_with_policy(
|
||||
|
||||
logging.info("make_env online")
|
||||
|
||||
online_env = make_robot_env(cfg=cfg.env)
|
||||
online_env, teleop_device = make_robot_env(cfg=cfg.env)
|
||||
env_processor, action_processor = make_processors(online_env, teleop_device, cfg.env, cfg.policy.device)
|
||||
|
||||
set_seed(cfg.seed)
|
||||
device = get_safe_torch_device(cfg.policy.device, log=True)
|
||||
@@ -257,6 +264,12 @@ def act_with_policy(
|
||||
assert isinstance(policy, nn.Module)
|
||||
|
||||
obs, info = online_env.reset()
|
||||
env_processor.reset()
|
||||
action_processor.reset()
|
||||
|
||||
# Process initial observation
|
||||
transition = create_transition(observation=obs, info=info)
|
||||
transition = env_processor(transition)
|
||||
|
||||
# NOTE: For the moment we will solely handle the case of a single environment
|
||||
sum_reward_episode = 0
|
||||
@@ -274,45 +287,71 @@ def act_with_policy(
|
||||
logging.info("[ACTOR] Shutting down act_with_policy")
|
||||
return
|
||||
|
||||
if interaction_step >= cfg.policy.online_step_before_learning:
|
||||
# Time policy inference and check if it meets FPS requirement
|
||||
with policy_timer:
|
||||
action = policy.select_action(batch=obs)
|
||||
policy_fps = policy_timer.fps_last
|
||||
observation = {
|
||||
k: v for k, v in transition[TransitionKey.OBSERVATION].items() if k in cfg.policy.input_features
|
||||
}
|
||||
|
||||
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
|
||||
# Time policy inference and check if it meets FPS requirement
|
||||
with policy_timer:
|
||||
# Extract observation from transition for policy
|
||||
action = policy.select_action(batch=observation)
|
||||
policy_fps = policy_timer.fps_last
|
||||
|
||||
else:
|
||||
action = online_env.action_space.sample()
|
||||
log_policy_frequency_issue(policy_fps=policy_fps, cfg=cfg, interaction_step=interaction_step)
|
||||
|
||||
next_obs, reward, done, truncated, info = online_env.step(action)
|
||||
# Use the new step function
|
||||
new_transition = step_env_and_process_transition(
|
||||
env=online_env,
|
||||
transition=transition,
|
||||
action=action,
|
||||
env_processor=env_processor,
|
||||
action_processor=action_processor,
|
||||
)
|
||||
|
||||
# Extract values from processed transition
|
||||
next_observation = {
|
||||
k: v
|
||||
for k, v in new_transition[TransitionKey.OBSERVATION].items()
|
||||
if k in cfg.policy.input_features
|
||||
}
|
||||
|
||||
# Teleop action is the action that was executed in the environment
|
||||
# It is either the action from the teleop device or the action from the policy
|
||||
executed_action = new_transition[TransitionKey.COMPLEMENTARY_DATA]["teleop_action"]
|
||||
|
||||
reward = new_transition[TransitionKey.REWARD]
|
||||
done = new_transition.get(TransitionKey.DONE, False)
|
||||
truncated = new_transition.get(TransitionKey.TRUNCATED, False)
|
||||
|
||||
sum_reward_episode += float(reward)
|
||||
# Increment total steps counter for intervention rate
|
||||
episode_total_steps += 1
|
||||
|
||||
# NOTE: We override the action if the intervention is True, because the action applied is the intervention action
|
||||
if "is_intervention" in info and info["is_intervention"]:
|
||||
# NOTE: The action space for demonstration before hand is with the full action space
|
||||
# but sometimes for example we want to deactivate the gripper
|
||||
action = info["action_intervention"]
|
||||
# Check for intervention from transition info
|
||||
intervention_info = new_transition[TransitionKey.INFO]
|
||||
if intervention_info.get(TeleopEvents.IS_INTERVENTION, False):
|
||||
episode_intervention = True
|
||||
# Increment intervention steps counter
|
||||
episode_intervention_steps += 1
|
||||
|
||||
complementary_info = {
|
||||
"discrete_penalty": torch.tensor(
|
||||
[new_transition[TransitionKey.COMPLEMENTARY_DATA].get("discrete_penalty", 0.0)]
|
||||
),
|
||||
}
|
||||
# Create transition for learner (convert to old format)
|
||||
list_transition_to_send_to_learner.append(
|
||||
Transition(
|
||||
state=obs,
|
||||
action=action,
|
||||
state=observation,
|
||||
action=executed_action,
|
||||
reward=reward,
|
||||
next_state=next_obs,
|
||||
next_state=next_observation,
|
||||
done=done,
|
||||
truncated=truncated, # TODO: (azouitine) Handle truncation properly
|
||||
complementary_info=info,
|
||||
truncated=truncated,
|
||||
complementary_info=complementary_info,
|
||||
)
|
||||
)
|
||||
# assign obs to the next obs and continue the rollout
|
||||
obs = next_obs
|
||||
|
||||
# Update transition for next iteration
|
||||
transition = new_transition
|
||||
|
||||
if done or truncated:
|
||||
logging.info(f"[ACTOR] Global step {interaction_step}: Episode reward: {sum_reward_episode}")
|
||||
@@ -347,12 +386,20 @@ def act_with_policy(
|
||||
)
|
||||
)
|
||||
|
||||
# Reset intervention counters
|
||||
# Reset intervention counters and environment
|
||||
sum_reward_episode = 0.0
|
||||
episode_intervention = False
|
||||
episode_intervention_steps = 0
|
||||
episode_total_steps = 0
|
||||
|
||||
# Reset environment and processors
|
||||
obs, info = online_env.reset()
|
||||
env_processor.reset()
|
||||
action_processor.reset()
|
||||
|
||||
# Process initial observation
|
||||
transition = create_transition(observation=obs, info=info)
|
||||
transition = env_processor(transition)
|
||||
|
||||
if cfg.env.fps is not None:
|
||||
dt_time = time.perf_counter() - start_time
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -75,6 +75,7 @@ from lerobot.policies.sac.modeling_sac import SACPolicy
|
||||
from lerobot.robots import so100_follower # noqa: F401
|
||||
from lerobot.scripts.rl import learner_service
|
||||
from lerobot.teleoperators import gamepad, so101_leader # noqa: F401
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
from lerobot.transport import services_pb2_grpc
|
||||
from lerobot.transport.utils import (
|
||||
MAX_MESSAGE_SIZE,
|
||||
@@ -1174,7 +1175,7 @@ def process_transitions(
|
||||
|
||||
# Add to offline buffer if it's an intervention
|
||||
if dataset_repo_id is not None and transition.get("complementary_info", {}).get(
|
||||
"is_intervention"
|
||||
TeleopEvents.IS_INTERVENTION
|
||||
):
|
||||
offline_replay_buffer.add(**transition)
|
||||
|
||||
|
||||
@@ -209,10 +209,6 @@ def train(cfg: TrainPipelineConfig):
|
||||
batch = preprocessor(batch)
|
||||
train_tracker.dataloading_s = time.perf_counter() - start_time
|
||||
|
||||
for key in batch:
|
||||
if isinstance(batch[key], torch.Tensor):
|
||||
batch[key] = batch[key].to(device, non_blocking=device.type == "cuda")
|
||||
|
||||
train_tracker, output_dict = update_policy(
|
||||
train_tracker,
|
||||
policy,
|
||||
|
||||
@@ -56,11 +56,18 @@ import time
|
||||
from dataclasses import asdict, dataclass
|
||||
from pprint import pformat
|
||||
|
||||
import draccus
|
||||
import rerun as rr
|
||||
|
||||
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig # noqa: F401
|
||||
from lerobot.cameras.realsense.configuration_realsense import RealSenseCameraConfig # noqa: F401
|
||||
from lerobot.configs import parser
|
||||
from lerobot.processor import RobotProcessor
|
||||
from lerobot.processor.converters import (
|
||||
to_output_robot_action,
|
||||
to_transition_robot_observation,
|
||||
to_transition_teleop_action,
|
||||
)
|
||||
from lerobot.processor.pipeline import IdentityProcessor
|
||||
from lerobot.robots import ( # noqa: F401
|
||||
Robot,
|
||||
RobotConfig,
|
||||
@@ -97,21 +104,67 @@ class TeleoperateConfig:
|
||||
teleop_time_s: float | None = None
|
||||
# Display all cameras on screen
|
||||
display_data: bool = False
|
||||
# Optional processors for data transformation
|
||||
teleop_action_processor: RobotProcessor | None = None # runs after teleop
|
||||
robot_action_processor: RobotProcessor | None = None # runs before robot
|
||||
robot_observation_processor: RobotProcessor | None = None # runs after robot
|
||||
|
||||
|
||||
def teleop_loop(
|
||||
teleop: Teleoperator, robot: Robot, fps: int, display_data: bool = False, duration: float | None = None
|
||||
teleop: Teleoperator,
|
||||
robot: Robot,
|
||||
fps: int,
|
||||
display_data: bool = False,
|
||||
duration: float | None = None,
|
||||
teleop_action_processor: RobotProcessor | None = None,
|
||||
robot_action_processor: RobotProcessor | None = None,
|
||||
robot_observation_processor: RobotProcessor | None = None,
|
||||
):
|
||||
# Initialize processors with defaults if not provided
|
||||
teleop_action_processor = teleop_action_processor or RobotProcessor(
|
||||
steps=[IdentityProcessor()], to_transition=to_transition_teleop_action, to_output=lambda tr: tr
|
||||
)
|
||||
robot_action_processor = robot_action_processor or RobotProcessor(
|
||||
steps=[IdentityProcessor()],
|
||||
to_transition=lambda tr: tr,
|
||||
to_output=to_output_robot_action, # type: ignore[arg-type]
|
||||
)
|
||||
robot_observation_processor = robot_observation_processor or RobotProcessor(
|
||||
steps=[IdentityProcessor()], to_transition=to_transition_robot_observation, to_output=lambda tr: tr
|
||||
)
|
||||
|
||||
# Reset processors
|
||||
teleop_action_processor.reset()
|
||||
robot_action_processor.reset()
|
||||
robot_observation_processor.reset()
|
||||
|
||||
display_len = max(len(key) for key in robot.action_features)
|
||||
start = time.perf_counter()
|
||||
|
||||
while True:
|
||||
loop_start = time.perf_counter()
|
||||
action = teleop.get_action()
|
||||
if display_data:
|
||||
observation = robot.get_observation()
|
||||
log_rerun_data(observation=observation, action=action)
|
||||
|
||||
robot.send_action(action)
|
||||
# Get robot observation
|
||||
obs = robot.get_observation()
|
||||
|
||||
# Process robot observation through pipeline
|
||||
obs_transition = robot_observation_processor(obs)
|
||||
|
||||
# Get teleop action
|
||||
raw_action = teleop.get_action()
|
||||
|
||||
# Process teleop action through pipeline
|
||||
teleop_transition = teleop_action_processor(raw_action)
|
||||
|
||||
# Process action for robot through pipeline
|
||||
robot_action_to_send = robot_action_processor(teleop_transition)
|
||||
|
||||
# Send processed action to robot (robot_action_processor.to_output should return dict[str, Any])
|
||||
robot.send_action(robot_action_to_send) # type: ignore[arg-type]
|
||||
|
||||
if display_data:
|
||||
log_rerun_data([obs_transition, teleop_transition])
|
||||
|
||||
dt_s = time.perf_counter() - loop_start
|
||||
busy_wait(1 / fps - dt_s)
|
||||
|
||||
@@ -119,17 +172,18 @@ def teleop_loop(
|
||||
|
||||
print("\n" + "-" * (display_len + 10))
|
||||
print(f"{'NAME':<{display_len}} | {'NORM':>7}")
|
||||
for motor, value in action.items():
|
||||
# Display the final robot action that was sent
|
||||
for motor, value in robot_action_to_send.items():
|
||||
print(f"{motor:<{display_len}} | {value:>7.2f}")
|
||||
print(f"\ntime: {loop_s * 1e3:.2f}ms ({1 / loop_s:.0f} Hz)")
|
||||
|
||||
if duration is not None and time.perf_counter() - start >= duration:
|
||||
return
|
||||
|
||||
move_cursor_up(len(action) + 5)
|
||||
move_cursor_up(len(robot_action_to_send) + 5)
|
||||
|
||||
|
||||
@draccus.wrap()
|
||||
@parser.wrap()
|
||||
def teleoperate(cfg: TeleoperateConfig):
|
||||
init_logging()
|
||||
logging.info(pformat(asdict(cfg)))
|
||||
@@ -143,7 +197,16 @@ def teleoperate(cfg: TeleoperateConfig):
|
||||
robot.connect()
|
||||
|
||||
try:
|
||||
teleop_loop(teleop, robot, cfg.fps, display_data=cfg.display_data, duration=cfg.teleop_time_s)
|
||||
teleop_loop(
|
||||
teleop=teleop,
|
||||
robot=robot,
|
||||
fps=cfg.fps,
|
||||
display_data=cfg.display_data,
|
||||
duration=cfg.teleop_time_s,
|
||||
teleop_action_processor=cfg.teleop_action_processor,
|
||||
robot_action_processor=cfg.robot_action_processor,
|
||||
robot_observation_processor=cfg.robot_observation_processor,
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
|
||||
@@ -16,4 +16,4 @@
|
||||
|
||||
from .config import TeleoperatorConfig
|
||||
from .teleoperator import Teleoperator
|
||||
from .utils import make_teleoperator_from_config
|
||||
from .utils import TeleopEvents, make_teleoperator_from_config
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from ..utils import TeleopEvents
|
||||
|
||||
|
||||
class InputController:
|
||||
"""Base class for input controllers that generate motion deltas."""
|
||||
@@ -134,10 +136,10 @@ class KeyboardController(InputController):
|
||||
return False
|
||||
elif key == keyboard.Key.enter:
|
||||
self.key_states["success"] = True
|
||||
self.episode_end_status = "success"
|
||||
self.episode_end_status = TeleopEvents.SUCCESS
|
||||
elif key == keyboard.Key.backspace:
|
||||
self.key_states["failure"] = True
|
||||
self.episode_end_status = "failure"
|
||||
self.episode_end_status = TeleopEvents.FAILURE
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@@ -255,13 +257,13 @@ class GamepadController(InputController):
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.JOYBUTTONDOWN:
|
||||
if event.button == 3:
|
||||
self.episode_end_status = "success"
|
||||
self.episode_end_status = TeleopEvents.SUCCESS
|
||||
# A button (1) for failure
|
||||
elif event.button == 1:
|
||||
self.episode_end_status = "failure"
|
||||
self.episode_end_status = TeleopEvents.FAILURE
|
||||
# X button (0) for rerecord
|
||||
elif event.button == 0:
|
||||
self.episode_end_status = "rerecord_episode"
|
||||
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
|
||||
|
||||
# RB button (6) for closing gripper
|
||||
elif event.button == 6:
|
||||
@@ -451,11 +453,11 @@ class GamepadControllerHID(InputController):
|
||||
# Check if X/Square button (bit 5) is pressed for failure
|
||||
# Check if A/Cross button (bit 4) is pressed for rerecording
|
||||
if buttons & 1 << 7:
|
||||
self.episode_end_status = "success"
|
||||
self.episode_end_status = TeleopEvents.SUCCESS
|
||||
elif buttons & 1 << 5:
|
||||
self.episode_end_status = "failure"
|
||||
self.episode_end_status = TeleopEvents.FAILURE
|
||||
elif buttons & 1 << 4:
|
||||
self.episode_end_status = "rerecord_episode"
|
||||
self.episode_end_status = TeleopEvents.RERECORD_EPISODE
|
||||
else:
|
||||
self.episode_end_status = None
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ from typing import Any
|
||||
import numpy as np
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from ..utils import TeleopEvents
|
||||
from .configuration_gamepad import GamepadTeleopConfig
|
||||
|
||||
|
||||
@@ -93,9 +94,9 @@ class GamepadTeleop(Teleoperator):
|
||||
gamepad_action = np.array([delta_x, delta_y, delta_z], dtype=np.float32)
|
||||
|
||||
action_dict = {
|
||||
"delta_x": gamepad_action[0],
|
||||
"delta_y": gamepad_action[1],
|
||||
"delta_z": gamepad_action[2],
|
||||
"action.delta_x": gamepad_action[0],
|
||||
"action.delta_y": gamepad_action[1],
|
||||
"action.delta_z": gamepad_action[2],
|
||||
}
|
||||
|
||||
# Default gripper action is to stay
|
||||
@@ -107,6 +108,48 @@ class GamepadTeleop(Teleoperator):
|
||||
|
||||
return action_dict
|
||||
|
||||
def get_teleop_events(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get extra control events from the gamepad such as intervention status,
|
||||
episode termination, success indicators, etc.
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- is_intervention: bool - Whether human is currently intervening
|
||||
- terminate_episode: bool - Whether to terminate the current episode
|
||||
- success: bool - Whether the episode was successful
|
||||
- rerecord_episode: bool - Whether to rerecord the episode
|
||||
"""
|
||||
if self.gamepad is None:
|
||||
return {
|
||||
TeleopEvents.IS_INTERVENTION: False,
|
||||
TeleopEvents.TERMINATE_EPISODE: False,
|
||||
TeleopEvents.SUCCESS: False,
|
||||
TeleopEvents.RERECORD_EPISODE: False,
|
||||
}
|
||||
|
||||
# Update gamepad state to get fresh inputs
|
||||
self.gamepad.update()
|
||||
|
||||
# Check if intervention is active
|
||||
is_intervention = self.gamepad.should_intervene()
|
||||
|
||||
# Get episode end status
|
||||
episode_end_status = self.gamepad.get_episode_end_status()
|
||||
terminate_episode = episode_end_status in [
|
||||
TeleopEvents.RERECORD_EPISODE,
|
||||
TeleopEvents.FAILURE,
|
||||
]
|
||||
success = episode_end_status == TeleopEvents.SUCCESS
|
||||
rerecord_episode = episode_end_status == TeleopEvents.RERECORD_EPISODE
|
||||
|
||||
return {
|
||||
TeleopEvents.IS_INTERVENTION: is_intervention,
|
||||
TeleopEvents.TERMINATE_EPISODE: terminate_episode,
|
||||
TeleopEvents.SUCCESS: success,
|
||||
TeleopEvents.RERECORD_EPISODE: rerecord_episode,
|
||||
}
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Disconnect from the gamepad."""
|
||||
if self.gamepad is not None:
|
||||
|
||||
@@ -24,6 +24,7 @@ from typing import Any
|
||||
from lerobot.errors import DeviceAlreadyConnectedError, DeviceNotConnectedError
|
||||
|
||||
from ..teleoperator import Teleoperator
|
||||
from ..utils import TeleopEvents
|
||||
from .configuration_keyboard import KeyboardEndEffectorTeleopConfig, KeyboardTeleopConfig
|
||||
|
||||
PYNPUT_AVAILABLE = True
|
||||
@@ -167,13 +168,13 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (4,),
|
||||
"names": {"delta_x": 0, "delta_y": 1, "delta_z": 2, "gripper": 3},
|
||||
"names": {"action.delta_x": 0, "action.delta_y": 1, "action.delta_z": 2, "action.gripper": 3},
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"dtype": "float32",
|
||||
"shape": (3,),
|
||||
"names": {"delta_x": 0, "delta_y": 1, "delta_z": 2},
|
||||
"names": {"action.delta_x": 0, "action.delta_y": 1, "action.delta_z": 2},
|
||||
}
|
||||
|
||||
def _on_press(self, key):
|
||||
@@ -226,12 +227,75 @@ class KeyboardEndEffectorTeleop(KeyboardTeleop):
|
||||
self.current_pressed.clear()
|
||||
|
||||
action_dict = {
|
||||
"delta_x": delta_x,
|
||||
"delta_y": delta_y,
|
||||
"delta_z": delta_z,
|
||||
"action.delta_x": delta_x,
|
||||
"action.delta_y": delta_y,
|
||||
"action.delta_z": delta_z,
|
||||
}
|
||||
|
||||
if self.config.use_gripper:
|
||||
action_dict["gripper"] = gripper_action
|
||||
|
||||
return action_dict
|
||||
|
||||
def get_teleop_events(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get extra control events from the keyboard such as intervention status,
|
||||
episode termination, success indicators, etc.
|
||||
|
||||
Keyboard mappings:
|
||||
- Any movement keys pressed = intervention active
|
||||
- 's' key = success (terminate episode successfully)
|
||||
- 'r' key = rerecord episode (terminate and rerecord)
|
||||
- 'q' key = quit episode (terminate without success)
|
||||
|
||||
Returns:
|
||||
Dictionary containing:
|
||||
- is_intervention: bool - Whether human is currently intervening
|
||||
- terminate_episode: bool - Whether to terminate the current episode
|
||||
- success: bool - Whether the episode was successful
|
||||
- rerecord_episode: bool - Whether to rerecord the episode
|
||||
"""
|
||||
if not self.is_connected:
|
||||
return {
|
||||
TeleopEvents.IS_INTERVENTION: False,
|
||||
TeleopEvents.TERMINATE_EPISODE: False,
|
||||
TeleopEvents.SUCCESS: False,
|
||||
TeleopEvents.RERECORD_EPISODE: False,
|
||||
}
|
||||
|
||||
# Check if any movement keys are currently pressed (indicates intervention)
|
||||
movement_keys = [
|
||||
keyboard.Key.up,
|
||||
keyboard.Key.down,
|
||||
keyboard.Key.left,
|
||||
keyboard.Key.right,
|
||||
keyboard.Key.shift,
|
||||
keyboard.Key.shift_r,
|
||||
keyboard.Key.ctrl_r,
|
||||
keyboard.Key.ctrl_l,
|
||||
]
|
||||
is_intervention = any(self.current_pressed.get(key, False) for key in movement_keys)
|
||||
|
||||
# Check for episode control commands from misc_keys_queue
|
||||
terminate_episode = False
|
||||
success = False
|
||||
rerecord_episode = False
|
||||
|
||||
# Process any pending misc keys
|
||||
while not self.misc_keys_queue.empty():
|
||||
key = self.misc_keys_queue.get_nowait()
|
||||
if key == "s":
|
||||
success = True
|
||||
elif key == "r":
|
||||
terminate_episode = True
|
||||
rerecord_episode = True
|
||||
elif key == "q":
|
||||
terminate_episode = True
|
||||
success = False
|
||||
|
||||
return {
|
||||
TeleopEvents.IS_INTERVENTION: is_intervention,
|
||||
TeleopEvents.TERMINATE_EPISODE: terminate_episode,
|
||||
TeleopEvents.SUCCESS: success,
|
||||
TeleopEvents.RERECORD_EPISODE: rerecord_episode,
|
||||
}
|
||||
|
||||
@@ -16,3 +16,4 @@
|
||||
|
||||
from .config_so101_leader import SO101LeaderConfig
|
||||
from .so101_leader import SO101Leader
|
||||
from .so101_leader_follower import SO101LeaderFollower
|
||||
|
||||
@@ -26,3 +26,6 @@ class SO101LeaderConfig(TeleoperatorConfig):
|
||||
port: str
|
||||
|
||||
use_degrees: bool = False
|
||||
|
||||
# Enable leader-follower mode where leader can both lead and follow
|
||||
leader_follower_mode: bool = False
|
||||
|
||||
208
src/lerobot/teleoperators/so101_leader/so101_leader_follower.py
Normal file
208
src/lerobot/teleoperators/so101_leader/so101_leader_follower.py
Normal file
@@ -0,0 +1,208 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
from threading import Event, Thread
|
||||
|
||||
import numpy as np
|
||||
from pynput import keyboard
|
||||
|
||||
from lerobot.teleoperators.so101_leader.so101_leader import SO101Leader
|
||||
from lerobot.teleoperators.utils import TeleopEvents
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SO101LeaderFollower(SO101Leader):
|
||||
"""
|
||||
Extended SO101 Leader that can both lead (human control) and follow (mimic follower).
|
||||
|
||||
This class adds leader-follower functionality where:
|
||||
- In follow mode: The leader arm mimics the follower's position (torque enabled)
|
||||
- In lead mode: Human controls the leader (torque disabled) and provides actions
|
||||
"""
|
||||
|
||||
def __init__(self, config):
|
||||
super().__init__(config)
|
||||
|
||||
# Leader-follower state
|
||||
self.is_intervening = False
|
||||
self.leader_torque_enabled = True
|
||||
|
||||
# Tracking error for automatic intervention detection
|
||||
self.leader_tracking_error_queue = deque(maxlen=4)
|
||||
|
||||
# Keyboard event handling
|
||||
self.keyboard_events = {
|
||||
"intervention": False,
|
||||
"success": False,
|
||||
"failure": False,
|
||||
"rerecord": False,
|
||||
}
|
||||
self.keyboard_thread = None
|
||||
self.stop_event = Event()
|
||||
|
||||
# Store last follower position for action computation
|
||||
self.last_follower_pos = None
|
||||
|
||||
def connect(self, calibrate: bool = True) -> None:
|
||||
"""Connect and configure for leader-follower mode."""
|
||||
super().connect(calibrate)
|
||||
|
||||
# Configure for leader-follower mode with lower gains
|
||||
# Lower gains allow manual intervention without injury risk
|
||||
self.bus.sync_write("Torque_Enable", 1)
|
||||
for motor in self.bus.motors:
|
||||
self.bus.write("P_Coefficient", motor, 16)
|
||||
self.bus.write("I_Coefficient", motor, 0)
|
||||
self.bus.write("D_Coefficient", motor, 16)
|
||||
|
||||
# Start keyboard listener
|
||||
self._start_keyboard_listener()
|
||||
|
||||
print("- Leader-Follower Mode:")
|
||||
print(" - Press SPACE to toggle intervention (leader control)")
|
||||
print(" - When not intervening, leader follows follower position")
|
||||
print(" - When intervening, follower follows leader in end-effector space")
|
||||
print(" - Press 's' to mark episode as success")
|
||||
print(" - Press ESC to end episode as failure")
|
||||
print(" - Press 'r' to re-record episode")
|
||||
|
||||
def _start_keyboard_listener(self):
|
||||
"""Start keyboard listener thread for intervention control."""
|
||||
|
||||
def on_press(key):
|
||||
try:
|
||||
if key == keyboard.Key.space:
|
||||
self.keyboard_events["intervention"] = not self.keyboard_events["intervention"]
|
||||
self.is_intervening = self.keyboard_events["intervention"]
|
||||
state = "INTERVENTION MODE" if self.is_intervening else "FOLLOWING MODE"
|
||||
logger.info(f"Toggled to {state}")
|
||||
elif key == keyboard.Key.esc:
|
||||
self.keyboard_events["failure"] = True
|
||||
elif hasattr(key, "char"):
|
||||
if key.char == "s":
|
||||
self.keyboard_events["success"] = True
|
||||
elif key.char == "r":
|
||||
self.keyboard_events["rerecord"] = True
|
||||
except Exception as e:
|
||||
logger.error(f"Error handling key press: {e}")
|
||||
|
||||
def listen():
|
||||
with keyboard.Listener(on_press=on_press) as listener:
|
||||
while not self.stop_event.is_set():
|
||||
time.sleep(0.1)
|
||||
listener.stop()
|
||||
|
||||
self.keyboard_thread = Thread(target=listen, daemon=True)
|
||||
self.keyboard_thread.start()
|
||||
|
||||
def send_action(self, action: dict[str, float]) -> None:
|
||||
"""
|
||||
Send position commands to leader arm (follow mode).
|
||||
|
||||
Args:
|
||||
action: Dictionary of motor positions to command
|
||||
"""
|
||||
# Store follower position for later use
|
||||
self.last_follower_pos = np.array([action.get(f"{motor}.pos", 0) for motor in self.bus.motors])
|
||||
|
||||
if not self.is_intervening:
|
||||
# Follow mode: enable torque and track follower
|
||||
if not self.leader_torque_enabled:
|
||||
self.bus.sync_write("Torque_Enable", 1)
|
||||
self.leader_torque_enabled = True
|
||||
|
||||
# Send follower positions to leader
|
||||
goal_pos = {motor: action[f"{motor}.pos"] for motor in self.bus.motors}
|
||||
self.bus.sync_write("Goal_Position", goal_pos)
|
||||
|
||||
# Track error for automatic intervention detection
|
||||
current_pos = self.bus.sync_read("Present_Position")
|
||||
current_array = np.array([current_pos[motor] for motor in self.bus.motors])
|
||||
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
|
||||
self.leader_tracking_error_queue.append(error)
|
||||
|
||||
def get_action(self) -> dict[str, float]:
|
||||
"""
|
||||
Get action from leader arm.
|
||||
|
||||
In follow mode: Returns neutral/current positions
|
||||
In lead mode: Returns actual leader positions for follower to track
|
||||
"""
|
||||
start = time.perf_counter()
|
||||
|
||||
if self.is_intervening:
|
||||
# Lead mode: disable torque if needed and return leader positions
|
||||
if self.leader_torque_enabled:
|
||||
self.bus.sync_write("Torque_Enable", 0)
|
||||
self.leader_torque_enabled = False
|
||||
|
||||
# Get current leader position
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
|
||||
# Track error
|
||||
if self.last_follower_pos is not None:
|
||||
current_array = np.array([action[f"{motor}.pos"] for motor in self.bus.motors])
|
||||
error = np.linalg.norm(self.last_follower_pos[:-1] - current_array[:-1])
|
||||
self.leader_tracking_error_queue.append(error)
|
||||
else:
|
||||
# Follow mode: return current/neutral positions
|
||||
action = self.bus.sync_read("Present_Position")
|
||||
action = {f"{motor}.pos": val for motor, val in action.items()}
|
||||
|
||||
dt_ms = (time.perf_counter() - start) * 1e3
|
||||
logger.debug(f"{self} read action: {dt_ms:.1f}ms")
|
||||
return action
|
||||
|
||||
def get_teleop_events(self) -> dict[TeleopEvents, bool]:
|
||||
"""Get current keyboard events."""
|
||||
events = {}
|
||||
|
||||
# Map keyboard events to TeleopEvents
|
||||
if self.keyboard_events["success"]:
|
||||
events[TeleopEvents.SUCCESS] = True
|
||||
self.keyboard_events["success"] = False
|
||||
if self.keyboard_events["failure"]:
|
||||
events[TeleopEvents.FAILURE] = True
|
||||
events[TeleopEvents.TERMINATE_EPISODE] = True
|
||||
self.keyboard_events["failure"] = False
|
||||
if self.keyboard_events["rerecord"]:
|
||||
events[TeleopEvents.RERECORD_EPISODE] = True
|
||||
events[TeleopEvents.TERMINATE_EPISODE] = True
|
||||
self.keyboard_events["rerecord"] = False
|
||||
|
||||
# Always report intervention state
|
||||
events[TeleopEvents.IS_INTERVENTION] = self.is_intervening
|
||||
|
||||
return events
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Disconnect and cleanup."""
|
||||
self.stop_event.set()
|
||||
if self.keyboard_thread:
|
||||
self.keyboard_thread.join(timeout=1.0)
|
||||
super().disconnect()
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset leader-follower state."""
|
||||
self.is_intervening = False
|
||||
self.leader_torque_enabled = True
|
||||
self.leader_tracking_error_queue.clear()
|
||||
self.keyboard_events = {"intervention": False, "success": False, "failure": False, "rerecord": False}
|
||||
@@ -12,10 +12,22 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from enum import Enum
|
||||
|
||||
from .config import TeleoperatorConfig
|
||||
from .teleoperator import Teleoperator
|
||||
|
||||
|
||||
class TeleopEvents(Enum):
|
||||
"""Shared constants for teleoperator events across teleoperators."""
|
||||
|
||||
SUCCESS = "success"
|
||||
FAILURE = "failure"
|
||||
RERECORD_EPISODE = "rerecord_episode"
|
||||
IS_INTERVENTION = "is_intervention"
|
||||
TERMINATE_EPISODE = "terminate_episode"
|
||||
|
||||
|
||||
def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
|
||||
if config.type == "keyboard":
|
||||
from .keyboard import KeyboardTeleop
|
||||
@@ -30,7 +42,10 @@ def make_teleoperator_from_config(config: TeleoperatorConfig) -> Teleoperator:
|
||||
|
||||
return SO100Leader(config)
|
||||
elif config.type == "so101_leader":
|
||||
from .so101_leader import SO101Leader
|
||||
from .so101_leader import SO101Leader, SO101LeaderFollower
|
||||
|
||||
if getattr(config, "leader_follower_mode", False):
|
||||
return SO101LeaderFollower(config)
|
||||
|
||||
return SO101Leader(config)
|
||||
elif config.type == "stretch3":
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -23,7 +23,8 @@ from lerobot.configs.default import DatasetConfig
|
||||
from lerobot.configs.train import TrainPipelineConfig
|
||||
from lerobot.datasets.factory import make_dataset
|
||||
from lerobot.optim.factory import make_optimizer_and_scheduler
|
||||
from lerobot.policies.factory import make_policy, make_policy_config
|
||||
from lerobot.policies.factory import make_policy, make_policy_config, make_processor
|
||||
from lerobot.processor import TransitionKey
|
||||
from lerobot.utils.random_utils import set_seed
|
||||
|
||||
|
||||
@@ -37,7 +38,9 @@ def get_policy_stats(ds_repo_id: str, policy_name: str, policy_kwargs: dict):
|
||||
train_cfg.validate() # Needed for auto-setting some parameters
|
||||
|
||||
dataset = make_dataset(train_cfg)
|
||||
dataset_stats = dataset.meta.stats
|
||||
policy = make_policy(train_cfg.policy, ds_meta=dataset.meta)
|
||||
preprocessor, postprocessor = make_processor(train_cfg.policy, dataset_stats=dataset_stats)
|
||||
policy.train()
|
||||
|
||||
optimizer, _ = make_optimizer_and_scheduler(train_cfg, policy)
|
||||
@@ -49,7 +52,9 @@ def get_policy_stats(ds_repo_id: str, policy_name: str, policy_kwargs: dict):
|
||||
)
|
||||
|
||||
batch = next(iter(dataloader))
|
||||
batch = preprocessor(batch)
|
||||
loss, output_dict = policy.forward(batch)
|
||||
|
||||
if output_dict is not None:
|
||||
output_dict = {k: v for k, v in output_dict.items() if isinstance(v, torch.Tensor)}
|
||||
output_dict["loss"] = loss
|
||||
@@ -96,7 +101,12 @@ def get_policy_stats(ds_repo_id: str, policy_name: str, policy_kwargs: dict):
|
||||
else:
|
||||
actions_queue = train_cfg.policy.n_action_repeats
|
||||
|
||||
actions = {str(i): policy.select_action(obs).contiguous() for i in range(actions_queue)}
|
||||
actions = {}
|
||||
for i in range(actions_queue):
|
||||
unnormalized_action = policy.select_action(obs).contiguous()
|
||||
action_robot = postprocessor({TransitionKey.ACTION: unnormalized_action}).get(TransitionKey.ACTION)
|
||||
actions[str(i)] = action_robot
|
||||
|
||||
return output_dict, grad_stats, param_stats, actions
|
||||
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -26,7 +26,7 @@ from safetensors.torch import load_file
|
||||
from lerobot import available_policies
|
||||
from lerobot.configs.default import DatasetConfig
|
||||
from lerobot.configs.train import TrainPipelineConfig
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.configs.types import FeatureType, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_STATE
|
||||
from lerobot.datasets.factory import make_dataset
|
||||
from lerobot.datasets.utils import cycle, dataset_to_policy_features
|
||||
@@ -41,7 +41,6 @@ from lerobot.policies.factory import (
|
||||
make_policy_config,
|
||||
make_processor,
|
||||
)
|
||||
from lerobot.policies.normalize import Normalize, Unnormalize
|
||||
from lerobot.policies.pretrained import PreTrainedPolicy
|
||||
from lerobot.utils.random_utils import seeded_context
|
||||
from tests.artifacts.policies.save_policy_to_safetensors import get_policy_stats
|
||||
@@ -266,108 +265,6 @@ def test_save_and_load_pretrained(dummy_dataset_metadata, tmp_path, policy_name:
|
||||
torch.testing.assert_close(list(policy.parameters()), list(loaded_policy.parameters()), rtol=0, atol=0)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("insert_temporal_dim", [False, True])
|
||||
def test_normalize(insert_temporal_dim):
|
||||
"""
|
||||
Test that normalize/unnormalize can run without exceptions when properly set up, and that they raise
|
||||
an exception when the forward pass is called without the stats having been provided.
|
||||
|
||||
TODO(rcadene, alexander-soare): This should also test that the normalization / unnormalization works as
|
||||
expected.
|
||||
"""
|
||||
|
||||
input_features = {
|
||||
"observation.image": PolicyFeature(
|
||||
type=FeatureType.VISUAL,
|
||||
shape=(3, 96, 96),
|
||||
),
|
||||
"observation.state": PolicyFeature(
|
||||
type=FeatureType.STATE,
|
||||
shape=(10,),
|
||||
),
|
||||
}
|
||||
output_features = {
|
||||
"action": PolicyFeature(
|
||||
type=FeatureType.ACTION,
|
||||
shape=(5,),
|
||||
),
|
||||
}
|
||||
|
||||
norm_map = {
|
||||
"VISUAL": NormalizationMode.MEAN_STD,
|
||||
"STATE": NormalizationMode.MIN_MAX,
|
||||
"ACTION": NormalizationMode.MIN_MAX,
|
||||
}
|
||||
|
||||
dataset_stats = {
|
||||
"observation.image": {
|
||||
"mean": torch.randn(3, 1, 1),
|
||||
"std": torch.randn(3, 1, 1),
|
||||
"min": torch.randn(3, 1, 1),
|
||||
"max": torch.randn(3, 1, 1),
|
||||
},
|
||||
"observation.state": {
|
||||
"mean": torch.randn(10),
|
||||
"std": torch.randn(10),
|
||||
"min": torch.randn(10),
|
||||
"max": torch.randn(10),
|
||||
},
|
||||
"action": {
|
||||
"mean": torch.randn(5),
|
||||
"std": torch.randn(5),
|
||||
"min": torch.randn(5),
|
||||
"max": torch.randn(5),
|
||||
},
|
||||
}
|
||||
|
||||
bsize = 2
|
||||
input_batch = {
|
||||
"observation.image": torch.randn(bsize, 3, 96, 96),
|
||||
"observation.state": torch.randn(bsize, 10),
|
||||
}
|
||||
output_batch = {
|
||||
"action": torch.randn(bsize, 5),
|
||||
}
|
||||
|
||||
if insert_temporal_dim:
|
||||
tdim = 4
|
||||
|
||||
for key in input_batch:
|
||||
# [2,3,96,96] -> [2,tdim,3,96,96]
|
||||
input_batch[key] = torch.stack([input_batch[key]] * tdim, dim=1)
|
||||
|
||||
for key in output_batch:
|
||||
output_batch[key] = torch.stack([output_batch[key]] * tdim, dim=1)
|
||||
|
||||
# test without stats
|
||||
normalize = Normalize(input_features, norm_map, stats=None)
|
||||
with pytest.raises(AssertionError):
|
||||
normalize(input_batch)
|
||||
|
||||
# test with stats
|
||||
normalize = Normalize(input_features, norm_map, stats=dataset_stats)
|
||||
normalize(input_batch)
|
||||
|
||||
# test loading pretrained models
|
||||
new_normalize = Normalize(input_features, norm_map, stats=None)
|
||||
new_normalize.load_state_dict(normalize.state_dict())
|
||||
new_normalize(input_batch)
|
||||
|
||||
# test without stats
|
||||
unnormalize = Unnormalize(output_features, norm_map, stats=None)
|
||||
with pytest.raises(AssertionError):
|
||||
unnormalize(output_batch)
|
||||
|
||||
# test with stats
|
||||
unnormalize = Unnormalize(output_features, norm_map, stats=dataset_stats)
|
||||
unnormalize(output_batch)
|
||||
|
||||
# test loading pretrained models
|
||||
new_unnormalize = Unnormalize(output_features, norm_map, stats=None)
|
||||
new_unnormalize.load_state_dict(unnormalize.state_dict())
|
||||
unnormalize(output_batch)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("multikey", [True, False])
|
||||
def test_multikey_construction(multikey: bool):
|
||||
"""
|
||||
@@ -467,6 +364,8 @@ def test_backward_compatibility(ds_repo_id: str, policy_name: str, policy_kwargs
|
||||
NOTE: If the test does not pass, and you don't change the policy, it is likely that the test artifact
|
||||
is out of date. For example, some PyTorch versions have different randomness, see this PR:
|
||||
https://github.com/huggingface/lerobot/pull/1127.
|
||||
NOTE: If the test don't pass and you don't change the policy, and note the dependencies version,
|
||||
and you changed your processor, you might have to update the test artifact.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
314
tests/processor/test_act_processor.py
Normal file
314
tests/processor/test_act_processor.py
Normal file
@@ -0,0 +1,314 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for ACT policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_STATE
|
||||
from lerobot.policies.act.configuration_act import ACTConfig
|
||||
from lerobot.policies.act.processor_act import make_act_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
RobotProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default ACT configuration for testing."""
|
||||
config = ACTConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(7,)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(4,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.ACTION: NormalizationMode.MEAN_STD,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(7), "std": torch.ones(7)},
|
||||
ACTION: {"mean": torch.zeros(4), "std": torch.ones(4)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_act_processor_basic():
|
||||
"""Test basic creation of ACT processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_act_processor_normalization():
|
||||
"""Test that ACT processor correctly normalizes and unnormalizes data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is normalized and batched
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 4)
|
||||
|
||||
# Process action through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is unnormalized
|
||||
assert postprocessed[TransitionKey.ACTION].shape == (1, 4)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_act_processor_cuda():
|
||||
"""Test ACT processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_act_processor_accelerate_scenario():
|
||||
"""Test ACT processor in simulated Accelerate scenario (data already on GPU)."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {OBS_STATE: torch.randn(1, 7).to(device)} # Already batched and on GPU
|
||||
action = torch.randn(1, 4).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU (not moved unnecessarily)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_act_processor_multi_gpu():
|
||||
"""Test ACT processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU (like in multi-GPU training)
|
||||
device = torch.device("cuda:1")
|
||||
observation = {OBS_STATE: torch.randn(1, 7).to(device)}
|
||||
action = torch.randn(1, 4).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1 (not moved to cuda:0)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_act_processor_without_stats():
|
||||
"""Test ACT processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors, but normalization won't have stats
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work (but won't normalize without stats)
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_act_processor_save_and_load():
|
||||
"""Test saving and loading ACT processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 4)
|
||||
|
||||
|
||||
def test_act_processor_device_placement_preservation():
|
||||
"""Test that ACT processor preserves device placement correctly."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
# Test with CPU config
|
||||
config.device = "cpu"
|
||||
preprocessor, _ = make_act_processor(config, stats)
|
||||
|
||||
# Process CPU data
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cpu"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_act_processor_mixed_precision():
|
||||
"""Test ACT processor with mixed precision (float16)."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Modify the device processor to use float16
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(7, dtype=torch.float32)}
|
||||
action = torch.randn(4, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_act_processor_batch_consistency():
|
||||
"""Test that ACT processor handles different batch sizes correctly."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_act_processor(config, stats)
|
||||
|
||||
# Test single sample (unbatched)
|
||||
observation = {OBS_STATE: torch.randn(7)}
|
||||
action = torch.randn(4)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == 1 # Batched
|
||||
|
||||
# Test already batched data
|
||||
observation_batched = {OBS_STATE: torch.randn(8, 7)} # Batch of 8
|
||||
action_batched = torch.randn(8, 4)
|
||||
transition_batched = create_transition(observation_batched, action_batched)
|
||||
|
||||
processed_batched = preprocessor(transition_batched)
|
||||
assert processed_batched[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == 8
|
||||
assert processed_batched[TransitionKey.ACTION].shape[0] == 8
|
||||
329
tests/processor/test_classifier_processor.py
Normal file
329
tests/processor/test_classifier_processor.py
Normal file
@@ -0,0 +1,329 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for Reward Classifier processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.sac.reward_model.configuration_classifier import RewardClassifierConfig
|
||||
from lerobot.policies.sac.reward_model.processor_classifier import make_classifier_processor
|
||||
from lerobot.processor import DeviceProcessor, IdentityProcessor, NormalizerProcessor, RobotProcessor
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default Reward Classifier configuration for testing."""
|
||||
config = RewardClassifierConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
"reward": PolicyFeature(type=FeatureType.ACTION, shape=(1,)), # Classifier output
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.IDENTITY, # No normalization for classifier output
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
"reward": {}, # No normalization for classifier output
|
||||
}
|
||||
|
||||
|
||||
def test_make_classifier_processor_basic():
|
||||
"""Test basic creation of Classifier processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "classifier_preprocessor"
|
||||
assert postprocessor.name == "classifier_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 3
|
||||
assert isinstance(preprocessor.steps[0], NormalizerProcessor) # For input features
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor) # For output features
|
||||
assert isinstance(preprocessor.steps[2], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], IdentityProcessor)
|
||||
|
||||
|
||||
def test_classifier_processor_normalization():
|
||||
"""Test that Classifier processor correctly normalizes data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(1) # Dummy action/reward
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is processed
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (10,)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1,)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_classifier_processor_cuda():
|
||||
"""Test Classifier processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(1)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
reward_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(reward_transition)
|
||||
|
||||
# Check that output is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_classifier_processor_accelerate_scenario():
|
||||
"""Test Classifier processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10).to(device),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_classifier_processor_multi_gpu():
|
||||
"""Test Classifier processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10).to(device),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_classifier_processor_without_stats():
|
||||
"""Test Classifier processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(1)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_classifier_processor_save_and_load():
|
||||
"""Test saving and loading Classifier processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(1)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (10,)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1,)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_classifier_processor_mixed_precision():
|
||||
"""Test Classifier processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10, dtype=torch.float32),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32),
|
||||
}
|
||||
action = torch.randn(1, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_classifier_processor_batch_data():
|
||||
"""Test Classifier processor with batched data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Test with batched data
|
||||
batch_size = 16
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(batch_size, 10),
|
||||
OBS_IMAGE: torch.randn(batch_size, 3, 224, 224),
|
||||
}
|
||||
action = torch.randn(batch_size, 1)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 10)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (batch_size, 1)
|
||||
|
||||
|
||||
def test_classifier_processor_postprocessor_identity():
|
||||
"""Test that Classifier postprocessor uses IdentityProcessor correctly."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_classifier_processor(config, stats)
|
||||
|
||||
# Create test data for postprocessor
|
||||
reward = torch.tensor([[0.8], [0.3], [0.9]]) # Batch of rewards/predictions
|
||||
transition = create_transition(action=reward)
|
||||
|
||||
# Process through postprocessor
|
||||
processed = postprocessor(transition)
|
||||
|
||||
# IdentityProcessor should leave values unchanged (except device)
|
||||
assert torch.allclose(processed[TransitionKey.ACTION].cpu(), reward.cpu())
|
||||
assert processed[TransitionKey.ACTION].device.type == "cpu"
|
||||
@@ -820,6 +820,143 @@ def test_complementary_data_none():
|
||||
assert TransitionKey.COMPLEMENTARY_DATA not in result
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_preserves_gpu_placement():
|
||||
"""Test that DeviceProcessor preserves GPU placement when tensor is already on GPU."""
|
||||
processor = DeviceProcessor(device="cuda:0")
|
||||
|
||||
# Create tensors already on GPU
|
||||
observation = {
|
||||
"observation.state": torch.randn(10).cuda(), # Already on GPU
|
||||
"observation.image": torch.randn(3, 224, 224).cuda(), # Already on GPU
|
||||
}
|
||||
action = torch.randn(5).cuda() # Already on GPU
|
||||
|
||||
transition = create_transition(observation=observation, action=action)
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tensors remain on their original GPU
|
||||
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
|
||||
assert result[TransitionKey.OBSERVATION]["observation.image"].device.type == "cuda"
|
||||
assert result[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Verify no unnecessary copies were made (same data pointer)
|
||||
assert torch.equal(
|
||||
result[TransitionKey.OBSERVATION]["observation.state"], observation["observation.state"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_multi_gpu_preservation():
|
||||
"""Test that DeviceProcessor preserves placement on different GPUs in multi-GPU setup."""
|
||||
# Test 1: GPU-to-GPU preservation (cuda:0 config, cuda:1 input)
|
||||
processor_gpu = DeviceProcessor(device="cuda:0")
|
||||
|
||||
# Create tensors on cuda:1 (simulating Accelerate placement)
|
||||
cuda1_device = torch.device("cuda:1")
|
||||
observation = {
|
||||
"observation.state": torch.randn(10).to(cuda1_device),
|
||||
"observation.image": torch.randn(3, 224, 224).to(cuda1_device),
|
||||
}
|
||||
action = torch.randn(5).to(cuda1_device)
|
||||
|
||||
transition = create_transition(observation=observation, action=action)
|
||||
result = processor_gpu(transition)
|
||||
|
||||
# Check that tensors remain on cuda:1 (not moved to cuda:0)
|
||||
assert result[TransitionKey.OBSERVATION]["observation.state"].device == cuda1_device
|
||||
assert result[TransitionKey.OBSERVATION]["observation.image"].device == cuda1_device
|
||||
assert result[TransitionKey.ACTION].device == cuda1_device
|
||||
|
||||
# Test 2: GPU-to-CPU should move to CPU (not preserve GPU)
|
||||
processor_cpu = DeviceProcessor(device="cpu")
|
||||
|
||||
transition_gpu = create_transition(
|
||||
observation={"observation.state": torch.randn(10).cuda()}, action=torch.randn(5).cuda()
|
||||
)
|
||||
result_cpu = processor_cpu(transition_gpu)
|
||||
|
||||
# Check that tensors are moved to CPU
|
||||
assert result_cpu[TransitionKey.OBSERVATION]["observation.state"].device.type == "cpu"
|
||||
assert result_cpu[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_multi_gpu_with_cpu_tensors():
|
||||
"""Test that CPU tensors are moved to configured device even in multi-GPU context."""
|
||||
# Processor configured for cuda:1
|
||||
processor = DeviceProcessor(device="cuda:1")
|
||||
|
||||
# Mix of CPU and GPU tensors
|
||||
observation = {
|
||||
"observation.cpu": torch.randn(10), # CPU tensor
|
||||
"observation.gpu0": torch.randn(10).cuda(0), # Already on cuda:0
|
||||
"observation.gpu1": torch.randn(10).cuda(1), # Already on cuda:1
|
||||
}
|
||||
action = torch.randn(5) # CPU tensor
|
||||
|
||||
transition = create_transition(observation=observation, action=action)
|
||||
result = processor(transition)
|
||||
|
||||
# CPU tensor should move to configured device (cuda:1)
|
||||
assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.type == "cuda"
|
||||
assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.index == 1
|
||||
assert result[TransitionKey.ACTION].device.type == "cuda"
|
||||
assert result[TransitionKey.ACTION].device.index == 1
|
||||
|
||||
# GPU tensors should stay on their original devices
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu0"].device.index == 0
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu1"].device.index == 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_multi_gpu_with_float_dtype():
|
||||
"""Test float dtype conversion works correctly with multi-GPU preservation."""
|
||||
processor = DeviceProcessor(device="cuda:0", float_dtype="float16")
|
||||
|
||||
# Create float tensors on different GPUs
|
||||
observation = {
|
||||
"observation.gpu0": torch.randn(5, dtype=torch.float32).cuda(0),
|
||||
"observation.gpu1": torch.randn(5, dtype=torch.float32).cuda(1),
|
||||
"observation.cpu": torch.randn(5, dtype=torch.float32), # CPU
|
||||
}
|
||||
|
||||
transition = create_transition(observation=observation)
|
||||
result = processor(transition)
|
||||
|
||||
# Check device placement
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu0"].device.index == 0
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu1"].device.index == 1
|
||||
assert result[TransitionKey.OBSERVATION]["observation.cpu"].device.index == 0 # Moved to cuda:0
|
||||
|
||||
# Check dtype conversion happened for all
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu0"].dtype == torch.float16
|
||||
assert result[TransitionKey.OBSERVATION]["observation.gpu1"].dtype == torch.float16
|
||||
assert result[TransitionKey.OBSERVATION]["observation.cpu"].dtype == torch.float16
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_simulated_accelerate_scenario():
|
||||
"""Test a scenario simulating how Accelerate would use the processor."""
|
||||
# Simulate different processes getting different GPU assignments
|
||||
for gpu_id in range(min(torch.cuda.device_count(), 2)):
|
||||
# Each "process" has a processor configured for cuda:0
|
||||
# but data comes in already placed on the process's GPU
|
||||
processor = DeviceProcessor(device="cuda:0")
|
||||
|
||||
# Simulate data already placed by Accelerate
|
||||
device = torch.device(f"cuda:{gpu_id}")
|
||||
observation = {"observation.state": torch.randn(1, 10).to(device)}
|
||||
action = torch.randn(1, 5).to(device)
|
||||
|
||||
transition = create_transition(observation=observation, action=action)
|
||||
result = processor(transition)
|
||||
|
||||
# Verify data stays on the GPU where Accelerate placed it
|
||||
assert result[TransitionKey.OBSERVATION]["observation.state"].device == device
|
||||
assert result[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_policy_processor_integration():
|
||||
"""Test integration with policy processors - input on GPU, output on CPU."""
|
||||
|
||||
342
tests/processor/test_diffusion_processor.py
Normal file
342
tests/processor/test_diffusion_processor.py
Normal file
@@ -0,0 +1,342 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for Diffusion policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.diffusion.configuration_diffusion import DiffusionConfig
|
||||
from lerobot.policies.diffusion.processor_diffusion import make_diffusion_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
RobotProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default Diffusion configuration for testing."""
|
||||
config = DiffusionConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(7,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(7), "std": torch.ones(7)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_diffusion_processor_basic():
|
||||
"""Test basic creation of Diffusion processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_diffusion_processor_with_images():
|
||||
"""Test Diffusion processor with image observations."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Create test data with images
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is batched
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 6)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_diffusion_processor_cuda():
|
||||
"""Test Diffusion processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_diffusion_processor_accelerate_scenario():
|
||||
"""Test Diffusion processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 7).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 6).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_diffusion_processor_multi_gpu():
|
||||
"""Test Diffusion processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 7).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 6).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_diffusion_processor_without_stats():
|
||||
"""Test Diffusion processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_diffusion_processor_save_and_load():
|
||||
"""Test saving and loading Diffusion processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 7)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 6)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_diffusion_processor_mixed_precision():
|
||||
"""Test Diffusion processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7, dtype=torch.float32),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32),
|
||||
}
|
||||
action = torch.randn(6, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_diffusion_processor_identity_normalization():
|
||||
"""Test that images with IDENTITY normalization are not normalized."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Create test data
|
||||
image_value = torch.rand(3, 224, 224) * 255 # Large values
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(7),
|
||||
OBS_IMAGE: image_value.clone(),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Image should not be normalized (IDENTITY mode)
|
||||
# Just batched
|
||||
assert torch.allclose(processed[TransitionKey.OBSERVATION][OBS_IMAGE][0], image_value, rtol=1e-5)
|
||||
|
||||
|
||||
def test_diffusion_processor_batch_consistency():
|
||||
"""Test Diffusion processor with different batch sizes."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_diffusion_processor(config, stats)
|
||||
|
||||
# Test with different batch sizes
|
||||
for batch_size in [1, 8, 32]:
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(batch_size, 7) if batch_size > 1 else torch.randn(7),
|
||||
OBS_IMAGE: torch.randn(batch_size, 3, 224, 224) if batch_size > 1 else torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(batch_size, 6) if batch_size > 1 else torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check correct batch size
|
||||
expected_batch = batch_size if batch_size > 1 else 1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape[0] == expected_batch
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape[0] == expected_batch
|
||||
assert processed[TransitionKey.ACTION].shape[0] == expected_batch
|
||||
@@ -25,6 +25,7 @@ from lerobot.processor.normalize_processor import (
|
||||
UnnormalizerProcessor,
|
||||
_convert_stats_to_tensors,
|
||||
hotswap_stats,
|
||||
rename_stats,
|
||||
)
|
||||
from lerobot.processor.pipeline import IdentityProcessor, RobotProcessor, TransitionKey
|
||||
|
||||
@@ -1604,3 +1605,156 @@ def test_hotswap_stats_functional_test():
|
||||
new_result["observation"]["observation.image"], observation["observation.image"]
|
||||
)
|
||||
assert not torch.allclose(new_result["action"], action)
|
||||
|
||||
|
||||
def test_zero_std_uses_eps():
|
||||
"""When std == 0, (x-mean)/(std+eps) is well-defined; x==mean should map to 0."""
|
||||
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
|
||||
stats = {"observation.state": {"mean": np.array([0.5]), "std": np.array([0.0])}}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats, eps=1e-6)
|
||||
|
||||
observation = {"observation.state": torch.tensor([0.5])} # equals mean
|
||||
out = normalizer(create_transition(observation=observation))
|
||||
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.state"], torch.tensor([0.0]))
|
||||
|
||||
|
||||
def test_min_equals_max_maps_to_minus_one():
|
||||
"""When min == max, MIN_MAX path maps to -1 after [-1,1] scaling for x==min."""
|
||||
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.MIN_MAX}
|
||||
stats = {"observation.state": {"min": np.array([2.0]), "max": np.array([2.0])}}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats, eps=1e-6)
|
||||
|
||||
observation = {"observation.state": torch.tensor([2.0])}
|
||||
out = normalizer(create_transition(observation=observation))
|
||||
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.state"], torch.tensor([-1.0]))
|
||||
|
||||
|
||||
def test_action_normalized_despite_normalize_keys():
|
||||
"""Action normalization is independent of normalize_keys filter for observations."""
|
||||
features = {
|
||||
"observation.state": PolicyFeature(FeatureType.STATE, (1,)),
|
||||
"action": PolicyFeature(FeatureType.ACTION, (2,)),
|
||||
}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.IDENTITY, FeatureType.ACTION: NormalizationMode.MEAN_STD}
|
||||
stats = {"action": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}}
|
||||
normalizer = NormalizerProcessor(
|
||||
features=features, norm_map=norm_map, stats=stats, normalize_keys={"observation.state"}
|
||||
)
|
||||
|
||||
transition = create_transition(
|
||||
observation={"observation.state": torch.tensor([3.0])}, action=torch.tensor([3.0, 3.0])
|
||||
)
|
||||
out = normalizer(transition)
|
||||
# (3-1)/2 = 1.0 ; (3-(-1))/4 = 1.0
|
||||
assert torch.allclose(out[TransitionKey.ACTION], torch.tensor([1.0, 1.0]))
|
||||
|
||||
|
||||
def test_unnormalize_observations_mean_std_and_min_max():
|
||||
features = {
|
||||
"observation.ms": PolicyFeature(FeatureType.STATE, (2,)),
|
||||
"observation.mm": PolicyFeature(FeatureType.STATE, (2,)),
|
||||
}
|
||||
# Build two processors: one mean/std and one min/max
|
||||
unnorm_ms = UnnormalizerProcessor(
|
||||
features={"observation.ms": features["observation.ms"]},
|
||||
norm_map={FeatureType.STATE: NormalizationMode.MEAN_STD},
|
||||
stats={"observation.ms": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}},
|
||||
)
|
||||
unnorm_mm = UnnormalizerProcessor(
|
||||
features={"observation.mm": features["observation.mm"]},
|
||||
norm_map={FeatureType.STATE: NormalizationMode.MIN_MAX},
|
||||
stats={"observation.mm": {"min": np.array([0.0, -2.0]), "max": np.array([2.0, 2.0])}},
|
||||
)
|
||||
|
||||
tr = create_transition(
|
||||
observation={
|
||||
"observation.ms": torch.tensor([0.0, 0.0]), # → mean
|
||||
"observation.mm": torch.tensor([0.0, 0.0]), # → mid-point
|
||||
}
|
||||
)
|
||||
out_ms = unnorm_ms(tr)[TransitionKey.OBSERVATION]["observation.ms"]
|
||||
out_mm = unnorm_mm(tr)[TransitionKey.OBSERVATION]["observation.mm"]
|
||||
assert torch.allclose(out_ms, torch.tensor([1.0, -1.0]))
|
||||
assert torch.allclose(out_mm, torch.tensor([1.0, 0.0])) # mid of [0,2] and [-2,2]
|
||||
|
||||
|
||||
def test_rename_stats_basic():
|
||||
orig = {
|
||||
"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])},
|
||||
"action": {"mean": np.array([0.0])},
|
||||
}
|
||||
mapping = {"observation.state": "observation.robot_state"}
|
||||
renamed = rename_stats(orig, mapping)
|
||||
assert "observation.robot_state" in renamed and "observation.state" not in renamed
|
||||
# Ensure deep copy: mutate original and verify renamed unaffected
|
||||
orig["observation.state"]["mean"][0] = 42.0
|
||||
assert renamed["observation.robot_state"]["mean"][0] != 42.0
|
||||
|
||||
|
||||
def test_unknown_observation_keys_ignored():
|
||||
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
|
||||
stats = {"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])}}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
|
||||
obs = {"observation.state": torch.tensor([1.0]), "observation.unknown": torch.tensor([5.0])}
|
||||
tr = create_transition(observation=obs)
|
||||
out = normalizer(tr)
|
||||
|
||||
# Unknown key should pass through unchanged and not be tracked
|
||||
assert torch.allclose(out[TransitionKey.OBSERVATION]["observation.unknown"], obs["observation.unknown"])
|
||||
comp = out.get(TransitionKey.COMPLEMENTARY_DATA) or {}
|
||||
assert "normalized_keys" in comp and "observation.unknown" not in comp["normalized_keys"]
|
||||
|
||||
|
||||
def test_batched_action_normalization():
|
||||
features = {"action": PolicyFeature(FeatureType.ACTION, (2,))}
|
||||
norm_map = {FeatureType.ACTION: NormalizationMode.MEAN_STD}
|
||||
stats = {"action": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])}}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
|
||||
actions = torch.tensor([[1.0, -1.0], [3.0, 3.0]]) # first equals mean → zeros; second → [1, 1]
|
||||
out = normalizer(create_transition(action=actions))[TransitionKey.ACTION]
|
||||
expected = torch.tensor([[0.0, 0.0], [1.0, 1.0]])
|
||||
assert torch.allclose(out, expected)
|
||||
|
||||
|
||||
def test_complementary_data_preservation():
|
||||
features = {"observation.state": PolicyFeature(FeatureType.STATE, (1,))}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD}
|
||||
stats = {"observation.state": {"mean": np.array([0.0]), "std": np.array([1.0])}}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
|
||||
comp = {"existing": 123}
|
||||
tr = create_transition(observation={"observation.state": torch.tensor([1.0])}, complementary_data=comp)
|
||||
out = normalizer(tr)
|
||||
new_comp = out[TransitionKey.COMPLEMENTARY_DATA]
|
||||
assert new_comp["existing"] == 123 and "normalized_keys" in new_comp
|
||||
|
||||
|
||||
def test_roundtrip_normalize_unnormalize_non_identity():
|
||||
features = {
|
||||
"observation.state": PolicyFeature(FeatureType.STATE, (2,)),
|
||||
"action": PolicyFeature(FeatureType.ACTION, (2,)),
|
||||
}
|
||||
norm_map = {FeatureType.STATE: NormalizationMode.MEAN_STD, FeatureType.ACTION: NormalizationMode.MIN_MAX}
|
||||
stats = {
|
||||
"observation.state": {"mean": np.array([1.0, -1.0]), "std": np.array([2.0, 4.0])},
|
||||
"action": {"min": np.array([-2.0, 0.0]), "max": np.array([2.0, 4.0])},
|
||||
}
|
||||
normalizer = NormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
unnormalizer = UnnormalizerProcessor(features=features, norm_map=norm_map, stats=stats)
|
||||
|
||||
# Add a time dimension in action for broadcasting check (B,T,D)
|
||||
obs = {"observation.state": torch.tensor([[3.0, 3.0], [1.0, -1.0]])}
|
||||
act = torch.tensor([[[0.0, -1.0], [1.0, 1.0]]]) # shape (1,2,2) already in [-1,1]
|
||||
|
||||
tr = create_transition(observation=obs, action=act)
|
||||
out = unnormalizer(normalizer(tr))
|
||||
|
||||
assert torch.allclose(
|
||||
out[TransitionKey.OBSERVATION]["observation.state"], obs["observation.state"], atol=1e-5
|
||||
)
|
||||
assert torch.allclose(out[TransitionKey.ACTION], act, atol=1e-5)
|
||||
|
||||
336
tests/processor/test_pi0_processor.py
Normal file
336
tests/processor/test_pi0_processor.py
Normal file
@@ -0,0 +1,336 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for PI0 policy processor."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.pi0.configuration_pi0 import PI0Config
|
||||
from lerobot.policies.pi0.processor_pi0 import Pi0NewLineProcessor, make_pi0_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
elif key == "complementary_data":
|
||||
transition[TransitionKey.COMPLEMENTARY_DATA] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default PI0 configuration for testing."""
|
||||
config = PI0Config()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
config.tokenizer_max_length = 128
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_pi0_processor_basic():
|
||||
"""Test basic creation of PI0 processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor"):
|
||||
preprocessor, postprocessor = make_pi0_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 6
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], Pi0NewLineProcessor)
|
||||
# Step 4 would be TokenizerProcessor but it's mocked
|
||||
assert isinstance(preprocessor.steps[5], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_pi0_newline_processor_single_task():
|
||||
"""Test Pi0NewLineProcessor with single task string."""
|
||||
processor = Pi0NewLineProcessor()
|
||||
|
||||
# Test with task that doesn't have newline
|
||||
transition = create_transition(complementary_data={"task": "test task"})
|
||||
result = processor(transition)
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
|
||||
|
||||
# Test with task that already has newline
|
||||
transition = create_transition(complementary_data={"task": "test task\n"})
|
||||
result = processor(transition)
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
|
||||
|
||||
|
||||
def test_pi0_newline_processor_list_of_tasks():
|
||||
"""Test Pi0NewLineProcessor with list of task strings."""
|
||||
processor = Pi0NewLineProcessor()
|
||||
|
||||
# Test with list of tasks
|
||||
tasks = ["task1", "task2\n", "task3"]
|
||||
transition = create_transition(complementary_data={"task": tasks})
|
||||
result = processor(transition)
|
||||
expected = ["task1\n", "task2\n", "task3\n"]
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == expected
|
||||
|
||||
|
||||
def test_pi0_newline_processor_empty_transition():
|
||||
"""Test Pi0NewLineProcessor with empty transition."""
|
||||
processor = Pi0NewLineProcessor()
|
||||
|
||||
# Test with no complementary_data
|
||||
transition = create_transition()
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
# Test with complementary_data but no task
|
||||
transition = create_transition(complementary_data={"other": "data"})
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
# Test with None task
|
||||
transition = create_transition(complementary_data={"task": None})
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_pi0_processor_cuda():
|
||||
"""Test PI0 processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "google/paligemma-3b-pt-224"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_pi0_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(10),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action, complementary_data={"task": "test task"})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_pi0_processor_accelerate_scenario():
|
||||
"""Test PI0 processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "google/paligemma-3b-pt-224"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_pi0_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU and batched
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 10).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 6).to(device)
|
||||
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_pi0_processor_multi_gpu():
|
||||
"""Test PI0 processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "google/paligemma-3b-pt-224"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_pi0_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 10).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 6).to(device)
|
||||
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_pi0_processor_without_stats():
|
||||
"""Test PI0 processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
# Mock the tokenizer processor
|
||||
with patch("lerobot.policies.pi0.processor_pi0.TokenizerProcessor"):
|
||||
preprocessor, postprocessor = make_pi0_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
|
||||
def test_pi0_newline_processor_state_dict():
|
||||
"""Test Pi0NewLineProcessor state dict methods."""
|
||||
processor = Pi0NewLineProcessor()
|
||||
|
||||
# Test state_dict (should be empty)
|
||||
state = processor.state_dict()
|
||||
assert state == {}
|
||||
|
||||
# Test load_state_dict (should do nothing)
|
||||
processor.load_state_dict({})
|
||||
|
||||
# Test reset (should do nothing)
|
||||
processor.reset()
|
||||
|
||||
# Test get_config
|
||||
config = processor.get_config()
|
||||
assert config == {}
|
||||
314
tests/processor/test_sac_processor.py
Normal file
314
tests/processor/test_sac_processor.py
Normal file
@@ -0,0 +1,314 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for SAC policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_STATE
|
||||
from lerobot.policies.sac.configuration_sac import SACConfig
|
||||
from lerobot.policies.sac.processor_sac import make_sac_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
RobotProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default SAC configuration for testing."""
|
||||
config = SACConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(5,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(10), "std": torch.ones(10)},
|
||||
ACTION: {"min": torch.full((5,), -1.0), "max": torch.ones(5)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_sac_processor_basic():
|
||||
"""Test basic creation of SAC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_sac_processor_normalization_modes():
|
||||
"""Test that SAC processor correctly handles different normalization modes."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(10) * 2} # Larger values to test normalization
|
||||
action = torch.rand(5) * 2 - 1 # Range [-1, 1]
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is normalized and batched
|
||||
# State should be mean-std normalized
|
||||
# Action should be min-max normalized to [-1, 1]
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 5)
|
||||
|
||||
# Process action through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is unnormalized (but still batched)
|
||||
assert postprocessed[TransitionKey.ACTION].shape == (1, 5)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_cuda():
|
||||
"""Test SAC processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_accelerate_scenario():
|
||||
"""Test SAC processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {OBS_STATE: torch.randn(10).to(device)}
|
||||
action = torch.randn(5).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_sac_processor_multi_gpu():
|
||||
"""Test SAC processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {OBS_STATE: torch.randn(10).to(device)}
|
||||
action = torch.randn(5).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_sac_processor_without_stats():
|
||||
"""Test SAC processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_sac_processor_save_and_load():
|
||||
"""Test saving and loading SAC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {OBS_STATE: torch.randn(10)}
|
||||
action = torch.randn(5)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 5)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_sac_processor_mixed_precision():
|
||||
"""Test SAC processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {OBS_STATE: torch.randn(10, dtype=torch.float32)}
|
||||
action = torch.randn(5, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_sac_processor_batch_data():
|
||||
"""Test SAC processor with batched data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Test with batched data
|
||||
batch_size = 32
|
||||
observation = {OBS_STATE: torch.randn(batch_size, 10)}
|
||||
action = torch.randn(batch_size, 5)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 10)
|
||||
assert processed[TransitionKey.ACTION].shape == (batch_size, 5)
|
||||
|
||||
|
||||
def test_sac_processor_edge_cases():
|
||||
"""Test SAC processor with edge cases."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_sac_processor(config, stats)
|
||||
|
||||
# Test with empty observation
|
||||
transition = create_transition(observation={}, action=torch.randn(5))
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION] == {}
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 5)
|
||||
|
||||
# Test with None action
|
||||
transition = create_transition(observation={OBS_STATE: torch.randn(10)}, action=None)
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 10)
|
||||
# When action is None, it may still be present with None value
|
||||
assert TransitionKey.ACTION not in processed or processed[TransitionKey.ACTION] is None
|
||||
350
tests/processor/test_smolvla_processor.py
Normal file
350
tests/processor/test_smolvla_processor.py
Normal file
@@ -0,0 +1,350 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for SmolVLA policy processor."""
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.smolvla.configuration_smolvla import SmolVLAConfig
|
||||
from lerobot.policies.smolvla.processor_smolvla import SmolVLANewLineProcessor, make_smolvla_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
elif key == "complementary_data":
|
||||
transition[TransitionKey.COMPLEMENTARY_DATA] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default SmolVLA configuration for testing."""
|
||||
config = SmolVLAConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(8,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
config.vlm_model_name = "HuggingFaceTB/SmolVLM-Instruct"
|
||||
config.pad_language_to = "max_length"
|
||||
config.tokenizer_max_length = 100
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(8), "std": torch.ones(8)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
ACTION: {"min": torch.full((7,), -1.0), "max": torch.ones(7)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_smolvla_processor_basic():
|
||||
"""Test basic creation of SmolVLA processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"):
|
||||
preprocessor, postprocessor = make_smolvla_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 6
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], SmolVLANewLineProcessor)
|
||||
# Step 4 would be TokenizerProcessor but it's mocked
|
||||
assert isinstance(preprocessor.steps[5], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_smolvla_newline_processor_single_task():
|
||||
"""Test SmolVLANewLineProcessor with single task string."""
|
||||
processor = SmolVLANewLineProcessor()
|
||||
|
||||
# Test with task that doesn't have newline
|
||||
transition = create_transition(complementary_data={"task": "test task"})
|
||||
result = processor(transition)
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
|
||||
|
||||
# Test with task that already has newline
|
||||
transition = create_transition(complementary_data={"task": "test task\n"})
|
||||
result = processor(transition)
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "test task\n"
|
||||
|
||||
|
||||
def test_smolvla_newline_processor_list_of_tasks():
|
||||
"""Test SmolVLANewLineProcessor with list of task strings."""
|
||||
processor = SmolVLANewLineProcessor()
|
||||
|
||||
# Test with list of tasks
|
||||
tasks = ["task1", "task2\n", "task3"]
|
||||
transition = create_transition(complementary_data={"task": tasks})
|
||||
result = processor(transition)
|
||||
expected = ["task1\n", "task2\n", "task3\n"]
|
||||
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == expected
|
||||
|
||||
|
||||
def test_smolvla_newline_processor_empty_transition():
|
||||
"""Test SmolVLANewLineProcessor with empty transition."""
|
||||
processor = SmolVLANewLineProcessor()
|
||||
|
||||
# Test with no complementary_data
|
||||
transition = create_transition()
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
# Test with complementary_data but no task
|
||||
transition = create_transition(complementary_data={"other": "data"})
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
# Test with None task
|
||||
transition = create_transition(complementary_data={"task": None})
|
||||
result = processor(transition)
|
||||
assert result == transition
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_smolvla_processor_cuda():
|
||||
"""Test SmolVLA processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_smolvla_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action, complementary_data={"task": "test task"})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_smolvla_processor_accelerate_scenario():
|
||||
"""Test SmolVLA processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_smolvla_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU and batched
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 8).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 7).to(device)
|
||||
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_smolvla_processor_multi_gpu():
|
||||
"""Test SmolVLA processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Mock the tokenizer processor to act as pass-through
|
||||
class MockTokenizerProcessor:
|
||||
def __init__(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def __call__(self, transition):
|
||||
return transition
|
||||
|
||||
def state_dict(self):
|
||||
return {}
|
||||
|
||||
def load_state_dict(self, state):
|
||||
pass
|
||||
|
||||
def reset(self):
|
||||
pass
|
||||
|
||||
def get_config(self):
|
||||
return {"tokenizer_name": "HuggingFaceTB/SmolVLM-Instruct"}
|
||||
|
||||
def transform_features(self, features):
|
||||
return features
|
||||
|
||||
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor", MockTokenizerProcessor):
|
||||
preprocessor, postprocessor = make_smolvla_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 8).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 7).to(device)
|
||||
transition = create_transition(observation, action, complementary_data={"task": ["test task"]})
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_smolvla_processor_without_stats():
|
||||
"""Test SmolVLA processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
# Mock the tokenizer processor
|
||||
with patch("lerobot.policies.smolvla.processor_smolvla.TokenizerProcessor"):
|
||||
preprocessor, postprocessor = make_smolvla_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
|
||||
def test_smolvla_newline_processor_state_dict():
|
||||
"""Test SmolVLANewLineProcessor state dict methods."""
|
||||
processor = SmolVLANewLineProcessor()
|
||||
|
||||
# Test state_dict (should be empty)
|
||||
state = processor.state_dict()
|
||||
assert state == {}
|
||||
|
||||
# Test load_state_dict (should do nothing)
|
||||
processor.load_state_dict({})
|
||||
|
||||
# Test reset (should do nothing)
|
||||
processor.reset()
|
||||
|
||||
# Test get_config
|
||||
config = processor.get_config()
|
||||
assert config == {}
|
||||
|
||||
|
||||
def test_smolvla_newline_processor_transform_features():
|
||||
"""Test SmolVLANewLineProcessor transform_features method."""
|
||||
processor = SmolVLANewLineProcessor()
|
||||
|
||||
# Test transform_features
|
||||
features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,)),
|
||||
}
|
||||
result = processor.transform_features(features)
|
||||
assert result == features # Should return unchanged
|
||||
350
tests/processor/test_tdmpc_processor.py
Normal file
350
tests/processor/test_tdmpc_processor.py
Normal file
@@ -0,0 +1,350 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for TDMPC policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.tdmpc.configuration_tdmpc import TDMPCConfig
|
||||
from lerobot.policies.tdmpc.processor_tdmpc import make_tdmpc_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
RobotProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default TDMPC configuration for testing."""
|
||||
config = TDMPCConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(12,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(6,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(12), "std": torch.ones(12)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
ACTION: {"min": torch.full((6,), -1.0), "max": torch.ones(6)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_tdmpc_processor_basic():
|
||||
"""Test basic creation of TDMPC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_tdmpc_processor_normalization():
|
||||
"""Test that TDMPC processor correctly normalizes and unnormalizes data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is processed and batched
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 6)
|
||||
|
||||
# Process action through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is unnormalized (but still batched)
|
||||
assert postprocessed[TransitionKey.ACTION].shape == (1, 6)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_tdmpc_processor_cuda():
|
||||
"""Test TDMPC processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_tdmpc_processor_accelerate_scenario():
|
||||
"""Test TDMPC processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12).to(device),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(6).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_tdmpc_processor_multi_gpu():
|
||||
"""Test TDMPC processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12).to(device),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(6).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_tdmpc_processor_without_stats():
|
||||
"""Test TDMPC processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_tdmpc_processor_save_and_load():
|
||||
"""Test saving and loading TDMPC processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 6)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_tdmpc_processor_mixed_precision():
|
||||
"""Test TDMPC processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(12, dtype=torch.float32),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32),
|
||||
}
|
||||
action = torch.randn(6, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_tdmpc_processor_batch_data():
|
||||
"""Test TDMPC processor with batched data."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Test with batched data
|
||||
batch_size = 64
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(batch_size, 12),
|
||||
OBS_IMAGE: torch.randn(batch_size, 3, 224, 224),
|
||||
}
|
||||
action = torch.randn(batch_size, 6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 12)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (batch_size, 6)
|
||||
|
||||
|
||||
def test_tdmpc_processor_edge_cases():
|
||||
"""Test TDMPC processor with edge cases."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_tdmpc_processor(config, stats)
|
||||
|
||||
# Test with only state observation (no image)
|
||||
observation = {OBS_STATE: torch.randn(12)}
|
||||
action = torch.randn(6)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 12)
|
||||
assert OBS_IMAGE not in processed[TransitionKey.OBSERVATION]
|
||||
|
||||
# Test with only image observation (no state)
|
||||
observation = {OBS_IMAGE: torch.randn(3, 224, 224)}
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert OBS_STATE not in processed[TransitionKey.OBSERVATION]
|
||||
@@ -725,3 +725,264 @@ def test_custom_padding_side(mock_auto_tokenizer):
|
||||
processor_right(transition)
|
||||
|
||||
assert tracking_tokenizer.padding_side_calls[-1] == "right"
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_device_detection_cpu():
|
||||
"""Test that tokenized tensors stay on CPU when other tensors are on CPU."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with CPU tensors
|
||||
observation = {"observation.state": torch.randn(10)} # CPU tensor
|
||||
action = torch.randn(5) # CPU tensor
|
||||
transition = create_transition(
|
||||
observation=observation, action=action, complementary_data={"task": "test task"}
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors are on CPU
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device.type == "cpu"
|
||||
assert attention_mask.device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
@require_package("transformers")
|
||||
def test_device_detection_cuda():
|
||||
"""Test that tokenized tensors are moved to CUDA when other tensors are on CUDA."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with CUDA tensors
|
||||
observation = {"observation.state": torch.randn(10).cuda()} # CUDA tensor
|
||||
action = torch.randn(5).cuda() # CUDA tensor
|
||||
transition = create_transition(
|
||||
observation=observation, action=action, complementary_data={"task": "test task"}
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors are on CUDA
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device.type == "cuda"
|
||||
assert attention_mask.device.type == "cuda"
|
||||
assert tokens.device.index == 0 # Should be on same device as input
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
@require_package("transformers")
|
||||
def test_device_detection_multi_gpu():
|
||||
"""Test that tokenized tensors match device in multi-GPU setup."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Test with tensors on cuda:1
|
||||
device = torch.device("cuda:1")
|
||||
observation = {"observation.state": torch.randn(10).to(device)}
|
||||
action = torch.randn(5).to(device)
|
||||
transition = create_transition(
|
||||
observation=observation, action=action, complementary_data={"task": "multi gpu test"}
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors are on cuda:1
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device == device
|
||||
assert attention_mask.device == device
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_device_detection_no_tensors():
|
||||
"""Test that tokenized tensors stay on CPU when no other tensors exist."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with no tensors
|
||||
transition = create_transition(
|
||||
observation={"metadata": {"key": "value"}}, # No tensors
|
||||
complementary_data={"task": "no tensor test"},
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors are on CPU (default)
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device.type == "cpu"
|
||||
assert attention_mask.device.type == "cpu"
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_device_detection_mixed_devices():
|
||||
"""Test device detection when tensors are on different devices (uses first found)."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
if torch.cuda.is_available():
|
||||
# Create transition with mixed devices
|
||||
observation = {
|
||||
"observation.cpu": torch.randn(10), # CPU
|
||||
"observation.cuda": torch.randn(10).cuda(), # CUDA
|
||||
}
|
||||
transition = create_transition(
|
||||
observation=observation, complementary_data={"task": "mixed device test"}
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# The device detection should use the first tensor found
|
||||
# (iteration order depends on dict, but result should be consistent)
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
# Both should be on the same device
|
||||
assert tokens.device == attention_mask.device
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
@require_package("transformers")
|
||||
def test_device_detection_from_action():
|
||||
"""Test that device is detected from action tensor when no observation tensors exist."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with action on CUDA but no observation tensors
|
||||
observation = {"metadata": {"key": "value"}} # No tensors in observation
|
||||
action = torch.randn(5).cuda()
|
||||
transition = create_transition(
|
||||
observation=observation, action=action, complementary_data={"task": "action device test"}
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors match action's device
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device.type == "cuda"
|
||||
assert attention_mask.device.type == "cuda"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
@require_package("transformers")
|
||||
def test_device_detection_from_complementary_data():
|
||||
"""Test that device is detected from tensors in complementary_data."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with tensor in complementary_data
|
||||
transition = create_transition(
|
||||
observation={"metadata": {"key": "value"}}, # No tensors
|
||||
complementary_data={
|
||||
"task": "comp data test",
|
||||
"index": torch.tensor([42]).cuda(), # Tensor in complementary_data
|
||||
},
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors match complementary_data tensor's device
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device.type == "cuda"
|
||||
assert attention_mask.device.type == "cuda"
|
||||
|
||||
|
||||
@require_package("transformers")
|
||||
def test_device_detection_preserves_dtype():
|
||||
"""Test that device detection doesn't affect dtype of tokenized tensors."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Create transition with float tensor (to test dtype isn't affected)
|
||||
observation = {"observation.state": torch.randn(10, dtype=torch.float16)}
|
||||
transition = create_transition(observation=observation, complementary_data={"task": "dtype test"})
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Check that tokenized tensors have correct dtypes (not affected by input dtype)
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.dtype == torch.long # Should remain long
|
||||
assert attention_mask.dtype == torch.bool # Should be bool (converted in processor)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
@require_package("transformers")
|
||||
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
||||
def test_integration_with_device_processor(mock_auto_tokenizer):
|
||||
"""Test that TokenizerProcessor works correctly with DeviceProcessor in pipeline."""
|
||||
from lerobot.processor import DeviceProcessor
|
||||
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
||||
|
||||
# Create pipeline with TokenizerProcessor then DeviceProcessor
|
||||
tokenizer_processor = TokenizerProcessor(tokenizer_name="test-tokenizer", max_length=6)
|
||||
device_processor = DeviceProcessor(device="cuda:0")
|
||||
robot_processor = RobotProcessor([tokenizer_processor, device_processor])
|
||||
|
||||
# Start with CPU tensors
|
||||
transition = create_transition(
|
||||
observation={"observation.state": torch.randn(10)}, # CPU
|
||||
action=torch.randn(5), # CPU
|
||||
complementary_data={"task": "pipeline test"},
|
||||
)
|
||||
|
||||
result = robot_processor(transition)
|
||||
|
||||
# All tensors should end up on CUDA (moved by DeviceProcessor)
|
||||
assert result[TransitionKey.OBSERVATION]["observation.state"].device.type == "cuda"
|
||||
assert result[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Tokenized tensors should also be on CUDA
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
assert tokens.device.type == "cuda"
|
||||
assert attention_mask.device.type == "cuda"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
@require_package("transformers")
|
||||
def test_simulated_accelerate_scenario():
|
||||
"""Test scenario simulating Accelerate with data already on GPU."""
|
||||
mock_tokenizer = MockTokenizer(vocab_size=100)
|
||||
processor = TokenizerProcessor(tokenizer=mock_tokenizer, max_length=10)
|
||||
|
||||
# Simulate Accelerate scenario: batch already on GPU
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
"observation.state": torch.randn(1, 10).to(device), # Batched, on GPU
|
||||
"observation.image": torch.randn(1, 3, 224, 224).to(device), # Batched, on GPU
|
||||
}
|
||||
action = torch.randn(1, 5).to(device) # Batched, on GPU
|
||||
|
||||
transition = create_transition(
|
||||
observation=observation,
|
||||
action=action,
|
||||
complementary_data={"task": ["accelerate test"]}, # List for batched task
|
||||
)
|
||||
|
||||
result = processor(transition)
|
||||
|
||||
# Tokenized tensors should match GPU placement
|
||||
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
||||
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
||||
|
||||
assert tokens.device == device
|
||||
assert attention_mask.device == device
|
||||
# MockTokenizer squeezes single-item batches, so shape is (max_length,) not (1, max_length)
|
||||
assert tokens.shape == (10,) # MockTokenizer behavior for single string in list
|
||||
assert attention_mask.shape == (10,)
|
||||
|
||||
345
tests/processor/test_vqbet_processor.py
Normal file
345
tests/processor/test_vqbet_processor.py
Normal file
@@ -0,0 +1,345 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""Tests for VQBeT policy processor."""
|
||||
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
import torch
|
||||
|
||||
from lerobot.configs.types import FeatureType, NormalizationMode, PolicyFeature
|
||||
from lerobot.constants import ACTION, OBS_IMAGE, OBS_STATE
|
||||
from lerobot.policies.vqbet.configuration_vqbet import VQBeTConfig
|
||||
from lerobot.policies.vqbet.processor_vqbet import make_vqbet_processor
|
||||
from lerobot.processor import (
|
||||
DeviceProcessor,
|
||||
NormalizerProcessor,
|
||||
RenameProcessor,
|
||||
RobotProcessor,
|
||||
ToBatchProcessor,
|
||||
UnnormalizerProcessor,
|
||||
)
|
||||
from lerobot.processor.pipeline import TransitionKey
|
||||
|
||||
|
||||
def create_transition(observation=None, action=None, **kwargs):
|
||||
"""Helper function to create a transition dictionary."""
|
||||
transition = {}
|
||||
if observation is not None:
|
||||
transition[TransitionKey.OBSERVATION] = observation
|
||||
if action is not None:
|
||||
transition[TransitionKey.ACTION] = action
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(TransitionKey, key.upper()):
|
||||
transition[getattr(TransitionKey, key.upper())] = value
|
||||
return transition
|
||||
|
||||
|
||||
def create_default_config():
|
||||
"""Create a default VQBeT configuration for testing."""
|
||||
config = VQBeTConfig()
|
||||
config.input_features = {
|
||||
OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(8,)),
|
||||
OBS_IMAGE: PolicyFeature(type=FeatureType.VISUAL, shape=(3, 224, 224)),
|
||||
}
|
||||
config.output_features = {
|
||||
ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(7,)),
|
||||
}
|
||||
config.normalization_mapping = {
|
||||
FeatureType.STATE: NormalizationMode.MEAN_STD,
|
||||
FeatureType.VISUAL: NormalizationMode.IDENTITY,
|
||||
FeatureType.ACTION: NormalizationMode.MIN_MAX,
|
||||
}
|
||||
config.device = "cpu"
|
||||
return config
|
||||
|
||||
|
||||
def create_default_stats():
|
||||
"""Create default dataset statistics for testing."""
|
||||
return {
|
||||
OBS_STATE: {"mean": torch.zeros(8), "std": torch.ones(8)},
|
||||
OBS_IMAGE: {}, # No normalization for images
|
||||
ACTION: {"min": torch.full((7,), -1.0), "max": torch.ones(7)},
|
||||
}
|
||||
|
||||
|
||||
def test_make_vqbet_processor_basic():
|
||||
"""Test basic creation of VQBeT processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Check processor names
|
||||
assert preprocessor.name == "robot_preprocessor"
|
||||
assert postprocessor.name == "robot_postprocessor"
|
||||
|
||||
# Check steps in preprocessor
|
||||
assert len(preprocessor.steps) == 4
|
||||
assert isinstance(preprocessor.steps[0], RenameProcessor)
|
||||
assert isinstance(preprocessor.steps[1], NormalizerProcessor)
|
||||
assert isinstance(preprocessor.steps[2], ToBatchProcessor)
|
||||
assert isinstance(preprocessor.steps[3], DeviceProcessor)
|
||||
|
||||
# Check steps in postprocessor
|
||||
assert len(postprocessor.steps) == 2
|
||||
assert isinstance(postprocessor.steps[0], DeviceProcessor)
|
||||
assert isinstance(postprocessor.steps[1], UnnormalizerProcessor)
|
||||
|
||||
|
||||
def test_vqbet_processor_with_images():
|
||||
"""Test VQBeT processor with image and state observations."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Create test data with images and states
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is batched
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 7)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_vqbet_processor_cuda():
|
||||
"""Test VQBeT processor with CUDA device."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Create CPU data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is on CUDA
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device.type == "cuda"
|
||||
assert processed[TransitionKey.ACTION].device.type == "cuda"
|
||||
|
||||
# Process through postprocessor
|
||||
action_transition = create_transition(action=processed[TransitionKey.ACTION])
|
||||
postprocessed = postprocessor(action_transition)
|
||||
|
||||
# Check that action is back on CPU
|
||||
assert postprocessed[TransitionKey.ACTION].device.type == "cpu"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_vqbet_processor_accelerate_scenario():
|
||||
"""Test VQBeT processor in simulated Accelerate scenario."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Simulate Accelerate: data already on GPU and batched
|
||||
device = torch.device("cuda:0")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 8).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 7).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on same GPU
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
||||
def test_vqbet_processor_multi_gpu():
|
||||
"""Test VQBeT processor with multi-GPU setup."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda:0"
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Simulate data on different GPU
|
||||
device = torch.device("cuda:1")
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(1, 8).to(device),
|
||||
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device),
|
||||
}
|
||||
action = torch.randn(1, 7).to(device)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data stays on cuda:1
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].device == device
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].device == device
|
||||
assert processed[TransitionKey.ACTION].device == device
|
||||
|
||||
|
||||
def test_vqbet_processor_without_stats():
|
||||
"""Test VQBeT processor creation without dataset statistics."""
|
||||
config = create_default_config()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, dataset_stats=None)
|
||||
|
||||
# Should still create processors
|
||||
assert preprocessor is not None
|
||||
assert postprocessor is not None
|
||||
|
||||
# Process should still work
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
assert processed is not None
|
||||
|
||||
|
||||
def test_vqbet_processor_save_and_load():
|
||||
"""Test saving and loading VQBeT processor."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Save preprocessor
|
||||
preprocessor.save_pretrained(tmpdir)
|
||||
|
||||
# Load preprocessor
|
||||
loaded_preprocessor = RobotProcessor.from_pretrained(tmpdir)
|
||||
|
||||
# Test that loaded processor works
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = loaded_preprocessor(transition)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (1, 7)
|
||||
|
||||
|
||||
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
||||
def test_vqbet_processor_mixed_precision():
|
||||
"""Test VQBeT processor with mixed precision."""
|
||||
config = create_default_config()
|
||||
config.device = "cuda"
|
||||
stats = create_default_stats()
|
||||
|
||||
# Create processor
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Replace DeviceProcessor with one that uses float16
|
||||
for i, step in enumerate(preprocessor.steps):
|
||||
if isinstance(step, DeviceProcessor):
|
||||
preprocessor.steps[i] = DeviceProcessor(device=config.device, float_dtype="float16")
|
||||
|
||||
# Create test data
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8, dtype=torch.float32),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224, dtype=torch.float32),
|
||||
}
|
||||
action = torch.randn(7, dtype=torch.float32)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that data is converted to float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].dtype == torch.float16
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].dtype == torch.float16
|
||||
assert processed[TransitionKey.ACTION].dtype == torch.float16
|
||||
|
||||
|
||||
def test_vqbet_processor_large_batch():
|
||||
"""Test VQBeT processor with large batch sizes."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Test with large batch
|
||||
batch_size = 128
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(batch_size, 8),
|
||||
OBS_IMAGE: torch.randn(batch_size, 3, 224, 224),
|
||||
}
|
||||
action = torch.randn(batch_size, 7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
# Process through preprocessor
|
||||
processed = preprocessor(transition)
|
||||
|
||||
# Check that batch dimension is preserved
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_STATE].shape == (batch_size, 8)
|
||||
assert processed[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (batch_size, 3, 224, 224)
|
||||
assert processed[TransitionKey.ACTION].shape == (batch_size, 7)
|
||||
|
||||
|
||||
def test_vqbet_processor_sequential_processing():
|
||||
"""Test VQBeT processor with sequential data processing."""
|
||||
config = create_default_config()
|
||||
stats = create_default_stats()
|
||||
|
||||
preprocessor, postprocessor = make_vqbet_processor(config, stats)
|
||||
|
||||
# Process multiple samples sequentially
|
||||
results = []
|
||||
for _ in range(5):
|
||||
observation = {
|
||||
OBS_STATE: torch.randn(8),
|
||||
OBS_IMAGE: torch.randn(3, 224, 224),
|
||||
}
|
||||
action = torch.randn(7)
|
||||
transition = create_transition(observation, action)
|
||||
|
||||
processed = preprocessor(transition)
|
||||
results.append(processed)
|
||||
|
||||
# Check that all results are consistent
|
||||
for result in results:
|
||||
assert result[TransitionKey.OBSERVATION][OBS_STATE].shape == (1, 8)
|
||||
assert result[TransitionKey.OBSERVATION][OBS_IMAGE].shape == (1, 3, 224, 224)
|
||||
assert result[TransitionKey.ACTION].shape == (1, 7)
|
||||
Reference in New Issue
Block a user