diff --git a/benchmark_rlearn_vs_rewind.py b/benchmark_rlearn_vs_rewind.py deleted file mode 100644 index 0e58759c4..000000000 --- a/benchmark_rlearn_vs_rewind.py +++ /dev/null @@ -1,345 +0,0 @@ -#!/usr/bin/env python -""" -Benchmark script to compare forward pass speed between RLearn and ReWiND implementations. - -This script compares the inference speed of: -1. RLearn model (lerobot implementation) -2. ReWiND model (reference implementation) - -Both models use the same backbone architectures (DINOv2 + sentence-transformers) -and implement similar reward modeling approaches. -""" - -import time -from itertools import chain -from random import random - -import einx -import torch -import torch.nn.functional as F -from einops import pack, rearrange, repeat, unpack -from hl_gauss_pytorch import HLGaussLayer -from sentence_transformers import SentenceTransformer -from torch import nn -from torch.nn.utils.rnn import pad_sequence - -# ReWiND implementation (copied from user's context) -from transformers import AutoImageProcessor, AutoModel -from vit_pytorch.accept_video_wrapper import AcceptVideoWrapper -from x_mlps_pytorch import Feedforwards -from x_transformers import Decoder - -from lerobot.constants import OBS_IMAGES, OBS_LANGUAGE -from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig - -# RLearn implementation -from lerobot.policies.rlearn.modeling_rlearn import RLearNPolicy - - -# ReWiND helper functions -def exists(v): - return v is not None - - -def satisfy_prob(prob): - return random() < prob - - -def mask_from_lens(lens): - seq = torch.arange(lens.amax().item(), device=lens.device) - mask = einx.less("n, b -> b n", seq, lens) - return mask - - -def randint(min_value: int, max_value: torch.Tensor): - value_range = (max_value - min_value).float() - return ((value_range * torch.rand_like(value_range)) + min_value).round().clamp(min=min_value).long() - - -# ReWiND DinoImageEmbedder -class DinoImageEmbedder(nn.Module): - def __init__(self): - super().__init__() - self.image_processor = AutoImageProcessor.from_pretrained("facebook/dinov2-base") - self.image_model = AutoModel.from_pretrained("facebook/dinov2-base") - - def forward(self, images): - model_inputs = self.image_processor(images, return_tensors="pt") - outputs = self.image_model(**model_inputs) - last_hidden_states = outputs[0] - return last_hidden_states[:, 0] # cls - - -# ReWiND RewardModel -class RewardModel(nn.Module): - def __init__( - self, - decoder: dict | Decoder = dict(dim=768, depth=4, heads=8, attn_dim_head=64), - image_model: nn.Module | None = None, - mlp_predictor_depth=3, - reward_bins=10, - max_video_frames=16, - dim_image_embed=768, - num_register_tokens=4, - lang_per_token_embed=True, - sentence_transformer_path="sentence-transformers/all-MiniLM-L12-v2", - categorical_rewards=False, - use_hl_gauss_loss=True, - reward_min_value=0.0, - reward_max_value=1.0, - reward_hl_gauss_loss_num_bins=20, - ): - super().__init__() - - self.lang_per_token_embed = lang_per_token_embed - self.mini_lm = SentenceTransformer(sentence_transformer_path) - mini_lm_dim = self.mini_lm.encode(["__"]).shape[-1] - - if not exists(image_model): - image_model = DinoImageEmbedder() - - self.video_embed = AcceptVideoWrapper(image_model) - self.decoder = Decoder(**decoder) - dim = self.decoder.dim - - self.first_pos_emb = nn.Parameter(torch.randn(dim) * 1e-2) - self.to_lang_tokens = nn.Linear(mini_lm_dim, dim) - self.to_video_tokens = nn.Linear(dim_image_embed, dim) - - self.mlp_predictor = Feedforwards( - dim=dim, dim_out=reward_bins if categorical_rewards else None, depth=mlp_predictor_depth - ) - - self.num_register_tokens = num_register_tokens - self.register_tokens = nn.Parameter(torch.randn(num_register_tokens, dim) * 1e-2) - self.categorical_rewards = categorical_rewards - - self.hl_gauss_layer = HLGaussLayer( - dim=dim, - use_regression=not use_hl_gauss_loss, - hl_gauss_loss=dict( - min_value=reward_min_value, - max_value=reward_max_value, - num_bins=reward_hl_gauss_loss_num_bins, - ), - ) - - def parameters(self): - return chain( - self.decoder.parameters(), - iter((self.video_embed.pos_emb,)), - self.to_lang_tokens.parameters(), - self.to_video_tokens.parameters(), - self.mlp_predictor.parameters(), - self.hl_gauss_layer.parameters(), - ) - - def forward( - self, - commands: list[str], - video, # (b c t h w) - extra_embed_tokens=None, # (b n d) - rewards=None, - video_lens=None, - ): - batch = video.shape[0] - assert len(commands) == batch - - device = video.device - mask = None - - # register tokens - register_tokens = repeat(self.register_tokens, "n d -> b n d", b=batch) - - # language embed - lang_embeds = self.mini_lm.encode( - commands, - output_value="token_embeddings" if self.lang_per_token_embed else "sentence_embedding", - convert_to_numpy=False, - ) - lang_embeds = pad_sequence(lang_embeds, batch_first=True).to(device) - - if self.lang_per_token_embed: - lens = torch.tensor([t.shape[0] for t in lang_embeds], device=device) - mask = mask_from_lens(lens) - - # extra embeds - if not exists(extra_embed_tokens): - extra_embed_tokens = register_tokens[:, 0:0] - - elif exists(extra_embed_tokens) and exists(mask): - mask = F.pad(mask, (0, extra_embed_tokens.shape[-2]), value=True) - - # video embeds - video_embeds = self.video_embed(video, eval_with_no_grad=True) - - if self.lang_per_token_embed: - mask = F.pad(mask, (0, video_embeds.shape[1] + self.num_register_tokens), value=True) - - # linear projections - lang_tokens = self.to_lang_tokens(lang_embeds) - video_tokens = self.to_video_tokens(video_embeds) - - # add video start positional embedding - first_video_token, rest_video_tokens = video_tokens[:, :1], video_tokens[:, 1:] - first_video_token = first_video_token + repeat(self.first_pos_emb, "d -> b 1 d", b=batch) - video_tokens = torch.cat((first_video_token, rest_video_tokens), dim=1) - - # pack all tokens for attention - tokens, lang_video_packed_shape = pack( - (lang_tokens, register_tokens, extra_embed_tokens, video_tokens), "b * d" - ) - - # attention - attended = self.decoder(tokens, mask=mask) - - # unpack and project the video tokens to logits to train reward predictor - _, _, _, attended_video_tokens = unpack(attended, lang_video_packed_shape, "b * d") - - video_frame_embed_or_logits = self.mlp_predictor(attended_video_tokens) - - # determine video masking for loss - video_mask = None - if exists(video_lens): - video_mask = mask_from_lens(video_lens) - max_video_len = video_lens.amax().item() - video_frame_embed_or_logits = video_frame_embed_or_logits[:, :max_video_len] - if exists(rewards): - rewards = rewards[:, :max_video_len] - rewards = einx.where("b t, b t,", video_mask, rewards, -1) - - # return raw prediction or loss - return_loss = exists(rewards) - if not return_loss: - if self.categorical_rewards: - return video_frame_embed_or_logits - else: - return self.hl_gauss_layer(video_frame_embed_or_logits) - - # calculate loss - if self.categorical_rewards: - assert rewards.dtype in (torch.long, torch.int) - loss = F.cross_entropy( - rearrange(video_frame_embed_or_logits, "b t l -> b l t"), rewards, ignore_index=-1 - ) - else: - assert rewards.dtype == torch.float - loss = self.hl_gauss_layer(video_frame_embed_or_logits, rewards, mask=video_mask) - - return loss - - -def benchmark_models(): - """Benchmark forward pass speed of RLearn vs ReWiND models.""" - - print("Setting up models and test data...") - - # Set device - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - print(f"Using device: {device}") - - # Test data - batch_size = 2 - num_frames = 16 - height, width = 224, 224 - - commands = [ - "pick up the blue ball and put it in the red tray", - "pick up the red cube and put it in the green bin", - ] - - # Create video tensor (B, C, T, H, W) for ReWiND - video_rewind = torch.rand(batch_size, 3, num_frames, height, width, device=device) - - # Create video tensor (B, T, C, H, W) for RLearn - video_rlearn = video_rewind.permute(0, 2, 1, 3, 4) # (B, T, C, H, W) - - # Create batch dict for RLearn - batch = {OBS_IMAGES: video_rlearn, OBS_LANGUAGE: commands} - - # Initialize RLearn model - print("Initializing RLearn model...") - rlearn_config = RLearNConfig() - rlearn_model = RLearNPolicy(rlearn_config).to(device) - rlearn_model.eval() - - # Initialize ReWiND model - print("Initializing ReWiND model...") - rewind_model = RewardModel().to(device) - rewind_model.eval() - - # Warm up both models - print("Warming up models...") - with torch.no_grad(): - for _ in range(3): - _ = rlearn_model.predict_rewards(batch) - _ = rewind_model(commands, video_rewind) - - # Benchmark RLearn - print("\nBenchmarking RLearn model...") - rlearn_times = [] - with torch.no_grad(): - for i in range(100): - start_time = time.perf_counter() - rewards = rlearn_model.predict_rewards(batch) - torch.cuda.synchronize() if torch.cuda.is_available() else None - end_time = time.perf_counter() - rlearn_times.append(end_time - start_time) - - # Benchmark ReWiND - print("Benchmarking ReWiND model...") - rewind_times = [] - with torch.no_grad(): - for i in range(100): - start_time = time.perf_counter() - rewards = rewind_model(commands, video_rewind) - torch.cuda.synchronize() if torch.cuda.is_available() else None - end_time = time.perf_counter() - rewind_times.append(end_time - start_time) - - # Calculate statistics - rlearn_avg = sum(rlearn_times) / len(rlearn_times) * 1000 # Convert to ms - rlearn_std = torch.tensor(rlearn_times).std().item() * 1000 - rlearn_min = min(rlearn_times) * 1000 - rlearn_max = max(rlearn_times) * 1000 - - rewind_avg = sum(rewind_times) / len(rewind_times) * 1000 - rewind_std = torch.tensor(rewind_times).std().item() * 1000 - rewind_min = min(rewind_times) * 1000 - rewind_max = max(rewind_times) * 1000 - - # Print results - print("\n" + "=" * 60) - print("BENCHMARK RESULTS (100 runs, inference only)") - print("=" * 60) - print(f"RLearn avg: {rlearn_avg:.2f} ms") - print(f"RLearn std: {rlearn_std:.2f} ms") - print(f"RLearn min: {rlearn_min:.2f} ms") - print(f"RLearn max: {rlearn_max:.2f} ms") - print(f"ReWiND avg: {rewind_avg:.2f} ms") - print(f"ReWiND std: {rewind_std:.2f} ms") - print(f"ReWiND min: {rewind_min:.2f} ms") - print(f"ReWiND max: {rewind_max:.2f} ms") - - speedup = rlearn_avg / rewind_avg if rewind_avg > 0 else float("inf") - print(f"Speedup (RLearn/ReWiND): {speedup:.2f}x") - print(f"{'RLearn is faster!' if speedup > 1 else 'ReWiND is faster!'}") - # Verify outputs are similar in shape - print("\nOutput shapes:") - with torch.no_grad(): - rlearn_output = rlearn_model.predict_rewards(batch) - rewind_output = rewind_model(commands, video_rewind) - - print(f"RLearn output shape: {rlearn_output.shape}") - print(f"ReWiND output shape: {rewind_output.shape}") - - if rlearn_output.shape == rewind_output.shape: - print("✓ Output shapes match") - else: - print("⚠ Output shapes differ - this may indicate implementation differences") - - print("\nBenchmark completed successfully!") - - -if __name__ == "__main__": - benchmark_models() diff --git a/src/lerobot/policies/rlearn/configuration_rlearn.py b/src/lerobot/policies/rlearn/configuration_rlearn.py index 1a406b1aa..74a227b11 100644 --- a/src/lerobot/policies/rlearn/configuration_rlearn.py +++ b/src/lerobot/policies/rlearn/configuration_rlearn.py @@ -34,13 +34,13 @@ class RLearNConfig(PreTrainedConfig): Notes: - This follows the ReWiND paper architecture. It uses frozen vision/text encoders - (DINO v3 for vision, sentence-transformers for language) and trains a + (SigLIP2 for both vision and language) and trains a lightweight temporal aggregator + head. """ - # Encoders - Using DINOv2 (base) for vision and sentence-transformers for text (ReWiND paper) - vision_model_name: str = "facebook/dinov2-base" - text_model_name: str = "sentence-transformers/all-MiniLM-L12-v2" + # Encoders - Using SigLIP2 for both vision and text + vision_model_name: str = "google/siglip2-base-patch16-224" + text_model_name: str = "google/siglip2-base-patch16-224" freeze_backbones: bool = True # Temporal aggregator @@ -63,6 +63,10 @@ class RLearNConfig(PreTrainedConfig): # Training learning_rate: float = 3e-5 weight_decay: float = 0.01 + + # Performance optimizations + use_amp: bool = True # Mixed precision training for speed boost + compile_model: bool = True # torch.compile for additional speedup # ReWiND-specific parameters use_video_rewind: bool = False # Enable video rewinding augmentation @@ -85,7 +89,7 @@ class RLearNConfig(PreTrainedConfig): ) # Architectural knobs to better mirror ReWiND - num_register_tokens: int = 4 + num_register_tokens: int = 4 # register / memory tokens, can't hurt mlp_predictor_depth: int = 3 # depth of the per-frame MLP head # HLGauss loss parameters @@ -93,8 +97,7 @@ class RLearNConfig(PreTrainedConfig): reward_min_value: float = 0.0 reward_max_value: float = 1.0 reward_hl_gauss_loss_num_bins: int = 20 - categorical_rewards: bool = False - reward_bins: int = 10 # only used if categorical_rewards=True + # Optional: path to episodes.jsonl to build full-episode indices automatically # Default to common dataset layout: /meta/episodes.jsonl diff --git a/src/lerobot/policies/rlearn/modeling_rlearn.py b/src/lerobot/policies/rlearn/modeling_rlearn.py index 8a469ee03..1c29797e6 100644 --- a/src/lerobot/policies/rlearn/modeling_rlearn.py +++ b/src/lerobot/policies/rlearn/modeling_rlearn.py @@ -33,7 +33,7 @@ High-level Architecture | per-frame encode v +------------------------------+ - | Vision Encoder (frozen) | e.g. DINOv2 (base) + | Vision Encoder (frozen) | e.g. SigLIP2 (base) +------------------------------+ |s | pooled per-frame embeddings (BT, H_v) @@ -46,7 +46,7 @@ High-level Architecture | | | v | +------------------------------+ - | | Text Encoder (frozen) | e.g. sentence-transformers + | | Text Encoder (frozen) | e.g. SigLIP2 | +------------------------------+ | | | | pooled text embedding (B, H_t) @@ -68,7 +68,7 @@ High-level Architecture - reward_logits: (B, T', 1) with T' ≤ T (affected by stride and frame dropout) Notes - - Uses DINOv2 (base, ~ViT-B) for vision and sentence-transformers (all-MiniLM-L12-v2) for text encoding. + - Uses SigLIP2 for both vision and text encoding. - Backbones (vision/text) are frozen by default; only projections, temporal module, and head are trainable. - Stride/frame dropout applied during training can subsample timesteps. """ @@ -77,6 +77,7 @@ from __future__ import annotations import math from itertools import chain +from operator import truediv import torch import torch.nn.functional as F @@ -103,10 +104,10 @@ from lerobot.policies.rlearn.configuration_rlearn import RLearNConfig class RLearNPolicy(PreTrainedPolicy): """Video-language conditioned reward model following ReWiND architecture exactly: https://github.com/lucidrains/rewind-reward-pytorch/blob/main/rewind_reward_pytorch/rewind_reward.py#L11. - - Visual encoder: frozen DINOv2 (base), returns per-frame embeddings. - - Text encoder: frozen sentence-transformers (all-MiniLM-L12-v2), returns a language embedding. + - Visual encoder: frozen SigLIP2, returns per-frame embeddings. + - Text encoder: frozen SigLIP2, returns a language embedding. - Temporal module: x_transformers Decoder with packed tokens [lang | register | video]. - - Output: per-timestep rewards via HLGauss layer or categorical bins. + - Output: per-timestep rewards via HLGauss layer (continuous only). """ config_class = RLearNConfig @@ -116,31 +117,34 @@ class RLearNPolicy(PreTrainedPolicy): super().__init__(config) self.config = config self.episode_data_index = episode_data_index # Store episode boundaries for progress calculation - self.categorical_rewards = config.categorical_rewards - # Encoders - ReWiND paper setup: DINOv2 for vision, sentence-transformers for text - from transformers import AutoImageProcessor, AutoModel - from sentence_transformers import SentenceTransformer + # Encoders - SigLIP2 for both vision and text + from transformers import AutoProcessor, AutoModel - # Load DINOv2 (base) vision encoder with its processor - self.vision_processor = AutoImageProcessor.from_pretrained(config.vision_model_name, use_fast=True) - self.vision_encoder = AutoModel.from_pretrained(config.vision_model_name) + # Load SigLIP2 processors and models + self.vision_processor = AutoProcessor.from_pretrained(config.vision_model_name, use_fast=True) + self.vision_model = AutoModel.from_pretrained(config.vision_model_name) - # Load sentence-transformers text encoder - self.text_encoder = SentenceTransformer(config.text_model_name) + self.text_processor = AutoProcessor.from_pretrained(config.text_model_name, use_fast=True) + self.text_model = AutoModel.from_pretrained(config.text_model_name) - # Move text encoder to same device as vision encoder (GPU if available) + # Move encoders to GPU if available if torch.cuda.is_available(): - self.text_encoder = self.text_encoder.to('cuda') + self.vision_model = self.vision_model.to('cuda') + self.text_model = self.text_model.to('cuda') - # DINOv2-base has 768 hidden size, all-MiniLM-L12-v2 has 384 - self.vision_hidden = 768 # DINOv2-base - self.text_hidden = 384 # all-MiniLM-L12-v2 + # Get hidden sizes from SigLIP2 config + vh = getattr(getattr(self.vision_model, 'config', None), 'vision_config', None) + self.vision_hidden = getattr(vh, 'hidden_size', 768) + + th = getattr(getattr(self.text_model, 'config', None), 'text_config', None) + self.text_hidden = getattr(th, 'hidden_size', 512) + # Freeze encoders if requested if config.freeze_backbones: - for p in self.vision_encoder.parameters(): + for p in self.vision_model.parameters(): p.requires_grad = False - for p in self.text_encoder.parameters(): + for p in self.text_model.parameters(): p.requires_grad = False # x_transformers Decoder (matching ReWiND exactly) @@ -170,7 +174,7 @@ class RLearNPolicy(PreTrainedPolicy): from x_mlps_pytorch import Feedforwards self.mlp_predictor = Feedforwards( dim=config.dim_model, - dim_out=config.reward_bins if config.categorical_rewards else None, + dim_out=None, depth=config.mlp_predictor_depth ) @@ -196,6 +200,17 @@ class RLearNPolicy(PreTrainedPolicy): except Exception: # Defer to runtime error with guidance if loading fails self.episode_data_index = None + + # Apply torch.compile for additional speedup if enabled + if getattr(config, "compile_model", False): + try: + self.vision_model = torch.compile(self.vision_model, mode="reduce-overhead") + self.text_model = torch.compile(self.text_model, mode="reduce-overhead") + self.decoder = torch.compile(self.decoder, mode="reduce-overhead") + print("✅ Applied torch.compile to encoders and transformer") + except Exception as e: + print(f"⚠️ torch.compile failed: {e}") + # Continue without compilation def get_optim_params(self) -> dict: # Train only projections, temporal module and head by default if backbones are frozen @@ -247,15 +262,8 @@ class RLearNPolicy(PreTrainedPolicy): # Process video frames video_embeds = self._encode_video_frames(frames).to(device) # (B, T, D_vision) - # Language embeddings (get lengths BEFORE padding) - lang_embeds_list = self.text_encoder.encode( - commands, - output_value='token_embeddings', - convert_to_tensor=False, - ) - lens = torch.tensor([le.shape[0] for le in lang_embeds_list], device=device) - lang_embeds = pad_sequence([torch.as_tensor(le, device=device) for le in lang_embeds_list], batch_first=True) - mask = self._mask_from_lens(lens) + # Language embeddings + mask + lang_embeds, mask = self._encode_language_tokens(commands, device) # Register tokens register_tokens = repeat(self.register_tokens, 'n d -> b n d', b=B) @@ -284,11 +292,8 @@ class RLearNPolicy(PreTrainedPolicy): # MLP predictor video_frame_embeds = self.mlp_predictor(attended_video_tokens) - # Get rewards via HLGauss layer - if self.categorical_rewards: - return video_frame_embeds # Return logits directly - else: - return self.hl_gauss_layer(video_frame_embeds).squeeze(-1) # (B, T) + # Get rewards via HLGauss layer (continuous rewards only) + return self.hl_gauss_layer(video_frame_embeds).squeeze(-1) # (B, T) def normalize_inputs(self, batch: dict[str, Tensor]) -> dict[str, Tensor]: # Initial version: no-op; rely on upstream processors if any @@ -299,7 +304,7 @@ class RLearNPolicy(PreTrainedPolicy): return batch def _encode_video_frames(self, frames: Tensor) -> Tensor: - """Encode video frames through DINOv2 to get per-frame embeddings. + """Encode video frames through SigLIP2 to get per-frame embeddings. Args: frames: (B, T, C, H, W) @@ -310,22 +315,34 @@ class RLearNPolicy(PreTrainedPolicy): B, T, C, H, W = frames.shape flat = rearrange(frames, 'b t c h w -> (b t) c h w') - # Process with DINOv2 - images_list = [] - for i in range(B * T): - img = flat[i].permute(1, 2, 0) # CHW -> HWC - if img.dtype == torch.uint8: - img = img.cpu().numpy() - else: - img = (img.clamp(0, 1) * 255).to(torch.uint8).cpu().numpy() - images_list.append(img) + # Optimized: Process tensor directly without numpy conversion + device = next(self.vision_model.parameters()).device - processed = self.vision_processor(images=images_list, return_tensors="pt") - pixel_values = processed["pixel_values"].to(next(self.vision_encoder.parameters()).device) - vision_outputs = self.vision_encoder(pixel_values) + # Normalize to [0, 1] if needed and ensure correct format for SigLIP2 + if flat.dtype != torch.float32: + flat = flat.float() + if flat.max() > 1.0: + flat = flat / 255.0 + + # SigLIP2 expects images in [0, 1] range, RGB format + # Resize and normalize in batch - much faster than individual processing + try: + # Try direct tensor processing (faster path) + processed = self.vision_processor(images=flat, return_tensors="pt") + pixel_values = processed["pixel_values"].to(device) + except: + # Fallback to individual processing if needed, but optimized + # Convert entire batch to numpy at once (much faster) + flat_numpy = flat.permute(0, 2, 3, 1).cpu().numpy() # (BT, H, W, C) + images_list = [flat_numpy[i] for i in range(B * T)] + + processed = self.vision_processor(images=images_list, return_tensors="pt") + pixel_values = processed["pixel_values"].to(device) + + # Process in batch through vision model + vision_outputs = self.vision_model.vision_model(pixel_values=pixel_values) + cls_tokens = vision_outputs.last_hidden_state[:, 0] - # Extract CLS tokens - cls_tokens = vision_outputs.last_hidden_state[:, 0] # (BT, D_vision) return rearrange(cls_tokens, '(b t) d -> b t d', b=B, t=T) def _mask_from_lens(self, lens: Tensor) -> Tensor: @@ -343,6 +360,9 @@ class RLearNPolicy(PreTrainedPolicy): Note: Progress labels (0 to 1) are generated automatically for each episode. No REWARD key is needed in the batch. """ + import time + forward_start = time.perf_counter() + batch = self.normalize_inputs(batch) batch = self.normalize_targets(batch) @@ -378,18 +398,15 @@ class RLearNPolicy(PreTrainedPolicy): elif not isinstance(commands, list): commands = [str(commands)] * B - # Process video frames through DINOv2 + # Process video frames through SigLIP2 + vision_start = time.perf_counter() video_embeds = self._encode_video_frames(frames).to(device) # (B, T_eff, D_vision) + vision_time = time.perf_counter() - vision_start - # Language embeddings (get lengths BEFORE padding) - lang_embeds_list = self.text_encoder.encode( - commands, - output_value='token_embeddings', - convert_to_tensor=False, - ) - lens = torch.tensor([le.shape[0] for le in lang_embeds_list], device=device) - lang_embeds = pad_sequence([torch.as_tensor(le, device=device) for le in lang_embeds_list], batch_first=True) - mask = self._mask_from_lens(lens) + # Language embeddings + mask + lang_start = time.perf_counter() + lang_embeds, mask = self._encode_language_tokens(commands, device) + lang_time = time.perf_counter() - lang_start # Token preparation # Register tokens @@ -411,6 +428,7 @@ class RLearNPolicy(PreTrainedPolicy): mask = F.pad(mask, (0, register_tokens.shape[1] + video_tokens.shape[1]), value=True) # Forward through x_transformers Decoder + transformer_start = time.perf_counter() attended = self.decoder(tokens, mask=mask) # Unpack and get video token features @@ -418,6 +436,7 @@ class RLearNPolicy(PreTrainedPolicy): # MLP predictor video_frame_embeds = self.mlp_predictor(attended_video_tokens) + transformer_time = time.perf_counter() - transformer_start # Generate progress labels on-the-fly (ReWiND approach) # IMPORTANT: Progress should be 0-1 across the ENTIRE EPISODE, not just the temporal window @@ -500,27 +519,15 @@ class RLearNPolicy(PreTrainedPolicy): # During inference, we might not want to compute loss if not self.training and target is None: # Return predictions without loss - if self.categorical_rewards: - return video_frame_embeds.mean() * 0.0, {"has_labels": 0.0} - else: - rewards = self.hl_gauss_layer(video_frame_embeds) - return rewards.mean() * 0.0, {"rewards_mean": rewards.mean().item()} + rewards = self.hl_gauss_layer(video_frame_embeds) + return rewards.mean() * 0.0, {"rewards_mean": rewards.mean().item()} - # Calculate loss using HLGauss or categorical - if self.categorical_rewards: - # Categorical cross-entropy loss - assert target.dtype in (torch.long, torch.int), "Categorical rewards require integer targets" - loss = F.cross_entropy( - rearrange(video_frame_embeds, 'b t l -> b l t'), - target.long(), - ignore_index=-1 - ) - else: - # HLGauss loss or MSE regression - assert target.dtype == torch.float, "Continuous rewards require float targets" - # Create video mask for variable length support - video_mask = torch.ones(B, T_eff, dtype=torch.bool, device=device) - loss = self.hl_gauss_layer(video_frame_embeds, target[:, :T_eff], mask=video_mask) + # Calculate loss using HLGauss (continuous rewards only) + loss_start = time.perf_counter() + assert target.dtype == torch.float, "Continuous rewards require float targets" + # Create video mask for variable length support + video_mask = torch.ones(B, T_eff, dtype=torch.bool, device=device) + loss = self.hl_gauss_layer(video_frame_embeds, target[:, :T_eff], mask=video_mask) # Optional: Mismatched video-language pairs loss L_mismatch = torch.zeros((), device=device) @@ -530,19 +537,12 @@ class RLearNPolicy(PreTrainedPolicy): shuffled_indices = torch.randperm(B, device=device) shuffled_commands = [commands[i] for i in shuffled_indices] - # Re-encode with mismatched language (compute lengths before padding) - lang_embeds_mm_list = self.text_encoder.encode( - shuffled_commands, - output_value='token_embeddings', - convert_to_tensor=False, - ) - lens_mm = torch.tensor([le.shape[0] for le in lang_embeds_mm_list], device=device) - lang_embeds_mm = pad_sequence([torch.as_tensor(le, device=device) for le in lang_embeds_mm_list], batch_first=True) + # Re-encode with mismatched language + lang_embeds_mm, mask_mm = self._encode_language_tokens(shuffled_commands, device) lang_tokens_mm = self.to_lang_tokens(lang_embeds_mm) # Pack and forward tokens_mm, lang_video_packed_shape_mm = pack((lang_tokens_mm, register_tokens, video_tokens), 'b * d') - mask_mm = self._mask_from_lens(lens_mm) mask_mm = F.pad(mask_mm, (0, register_tokens.shape[1] + video_tokens.shape[1]), value=True) attended_mm = self.decoder(tokens_mm, mask=mask_mm) _, _, attended_video_mm = unpack(attended_mm, lang_video_packed_shape_mm, 'b * d') @@ -550,17 +550,13 @@ class RLearNPolicy(PreTrainedPolicy): # Mismatched pairs should predict zero progress zeros_target = torch.zeros_like(target[:, :T_eff]) - if self.categorical_rewards: - L_mismatch = F.cross_entropy( - rearrange(mismatch_embeds, 'b t l -> b l t'), - zeros_target.long(), - ignore_index=-1 - ) - else: - L_mismatch = self.hl_gauss_layer(mismatch_embeds, zeros_target, mask=video_mask) + L_mismatch = self.hl_gauss_layer(mismatch_embeds, zeros_target, mask=video_mask) # Total loss total_loss = loss + L_mismatch + loss_time = time.perf_counter() - loss_start + + total_forward_time = time.perf_counter() - forward_start # Log individual loss components loss_dict.update({ @@ -568,11 +564,65 @@ class RLearNPolicy(PreTrainedPolicy): "loss_main": float(loss.detach().item()), "loss_mismatch": float(L_mismatch.detach().item()), "t_eff": float(T_eff), - "lang_len_mean": float(lens.float().mean().item()), + "lang_len_mean": float(mask.sum().float().mean().item()), # Use mask to get actual lengths + # Timing information + "timing_vision_ms": float(vision_time * 1000), + "timing_language_ms": float(lang_time * 1000), + "timing_transformer_ms": float(transformer_time * 1000), + "timing_loss_ms": float(loss_time * 1000), + "timing_total_forward_ms": float(total_forward_time * 1000), }) + + # Print detailed timing breakdown during training + if self.training: + print(f"RLearN Forward Pass Timing (B={B}, T_eff={T_eff}):") + print(f" Vision encoding: {vision_time*1000:.2f} ms") + print(f" Language encoding: {lang_time*1000:.2f} ms") + print(f" Transformer: {transformer_time*1000:.2f} ms") + print(f" Loss computation: {loss_time*1000:.2f} ms") + print(f" Total forward pass: {total_forward_time*1000:.2f} ms") + print(f" Throughput: {B*T_eff/(total_forward_time):.1f} frames/sec") return total_loss, loss_dict + def _encode_language_tokens(self, commands: list[str], device: torch.device) -> tuple[Tensor, Tensor]: + """Return (embeddings, mask) for language tokens using SigLIP2. + embeddings: (B, L, D); mask: (B, L) True for valid tokens. + """ + # Optimized: Process all commands in batch (much faster than individual processing) + proc = self.text_processor( + text=commands, + return_tensors='pt', + padding='max_length', + max_length=64, + truncation=True # Ensure we don't exceed max length + ) + + # Simplified access - SigLIP2 processor should return these directly + input_ids = proc.get('input_ids') + attention_mask = proc.get('attention_mask') + + if input_ids is None: + # Fallback for different processor structures + if hasattr(proc, 'input_ids'): + input_ids = proc.input_ids + attention_mask = getattr(proc, 'attention_mask', None) + else: + raise ValueError(f"Could not find input_ids in SigLIP processor output. Keys: {list(proc.keys())}") + + # Move to device efficiently + input_ids = input_ids.to(device, non_blocking=True) + if attention_mask is not None: + attention_mask = attention_mask.to(device, non_blocking=True) + else: + attention_mask = torch.ones_like(input_ids, device=device) + + # Batch encode through text model + outputs = self.text_model.text_model(input_ids=input_ids, attention_mask=attention_mask) + last_hidden = outputs.last_hidden_state + mask = attention_mask.bool() + return last_hidden, mask + def _extract_episode_and_frame_indices(self, batch: dict[str, Tensor]) -> tuple[Tensor | None, Tensor | None]: """Try to extract (episode_index, frame_index) tensors from batch or complementary data. @@ -755,7 +805,8 @@ def apply_video_rewind(frames: Tensor, rewind_prob: float = 0.5, last3_prob: flo B, T, C, H, W = frames.shape device = frames.device - # Create default progress labels (linearly increasing from 0 to 1) + # Create default progress labels (linearly increasing from 0 to 1 with denominator T-1) + # torch.linspace(0, 1, T) already yields j/(T-1) at step j default_progress = torch.linspace(0, 1, T, device=device).unsqueeze(0).expand(B, -1) # Apply rewind augmentation to each sample in batch independently @@ -773,14 +824,15 @@ def apply_video_rewind(frames: Tensor, rewind_prob: float = 0.5, last3_prob: flo continue # Apply rewinding to this video - # Split point i: between frame 2 and T-1 + # Split point i: between frame 2 and T-1 (upper bound exclusive in torch.randint) i = torch.randint(2, T, (1,)).item() - # Rewind length k: between 1 and i-1 frames, with option to force last-3 frames occasionally + # Rewind length k: between 1 and i-1 frames if last3_prob is not None and torch.rand(1).item() < last3_prob and i >= 3: k = min(3, i - 1) else: - k = torch.randint(1, min(i, T - i + 1), (1,)).item() + k = torch.randint(1, i, (1,)).item() + k = min(k, i - 1) # Create rewound sequence: o1...oi, oi-1, ..., oi-k forward_frames = frames[b, :i] # Frames up to split point @@ -789,25 +841,28 @@ def apply_video_rewind(frames: Tensor, rewind_prob: float = 0.5, last3_prob: flo # Concatenate forward and reverse parts rewound_seq = torch.cat([forward_frames, reverse_frames], dim=0) - # Pad with zeros if needed to maintain shape + # Pad by repeating the last real frame if needed to maintain fixed length T if rewound_seq.shape[0] < T: - padding = torch.zeros(T - rewound_seq.shape[0], C, H, W, device=device) - rewound_seq = torch.cat([rewound_seq, padding], dim=0) + last_frame = rewound_seq[-1:] + pad_frames = last_frame.expand(T - rewound_seq.shape[0], C, H, W) + rewound_seq = torch.cat([rewound_seq, pad_frames], dim=0) elif rewound_seq.shape[0] > T: rewound_seq = rewound_seq[:T] # Create corresponding progress labels - # Forward part: increasing progress - forward_progress = torch.linspace(0, i / T, i, device=device) - # Reverse part: decreasing progress - reverse_progress = torch.linspace(i / T, max(0, (i - k) / T), k, device=device) + denom = max(T - 1, 1) + # Forward part: increasing progress using denominator T-1 + forward_progress = torch.linspace(0, (i - 1) / denom, i, device=device) + # Reverse part: decreasing progress starting from (i-1)/(T-1) + reverse_progress = torch.linspace((i - 1) / denom, max(0.0, (i - k) / denom), k, device=device) rewound_progress = torch.cat([forward_progress, reverse_progress]) - # Pad progress if needed + # Pad progress by repeating the last real progress if needed if rewound_progress.shape[0] < T: - padding = torch.zeros(T - rewound_progress.shape[0], device=device) - rewound_progress = torch.cat([rewound_progress, padding]) + last_val = rewound_progress[-1] + pad_vals = last_val.expand(T - rewound_progress.shape[0]) + rewound_progress = torch.cat([rewound_progress, pad_vals]) elif rewound_progress.shape[0] > T: rewound_progress = rewound_progress[:T] diff --git a/src/lerobot/policies/rlearn/processor_rlearn.py b/src/lerobot/policies/rlearn/processor_rlearn.py index 7bf87b979..e0a9dd442 100644 --- a/src/lerobot/policies/rlearn/processor_rlearn.py +++ b/src/lerobot/policies/rlearn/processor_rlearn.py @@ -60,10 +60,10 @@ def make_rlearn_processor( ), ToBatchProcessor(), RLearnLanguageFromTaskProcessor(), - # Use the text model name for tokenizer to keep vocab aligned with text tower + # Use SigLIP2 for tokenizer to keep vocab aligned with text tower TokenizerProcessor( tokenizer_name=config.text_model_name, - max_length=128, + max_length=64, padding="max_length", truncation=True, padding_side="right", diff --git a/src/lerobot/scripts/annotate_dataset_rewards.py b/src/lerobot/scripts/annotate_dataset_rewards.py deleted file mode 100644 index 86c2db5a7..000000000 --- a/src/lerobot/scripts/annotate_dataset_rewards.py +++ /dev/null @@ -1,335 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Add ReWiND-style linear progress rewards to existing LeRobot datasets. - -This script creates a complete copy of the dataset with rewards added to each frame. -It downloads the original dataset (including videos), adds rewards, and pushes everything to a new repository. - -Usage: - # Create full dataset copy with rewards - python src/lerobot/scripts/annotate_dataset_rewards.py --input-repo IPEC-COMMUNITY/bc_z_lerobot --output-repo username/bc_z_with_rewards - - # Test with 1% of episodes - python src/lerobot/scripts/annotate_dataset_rewards.py --input-repo IPEC-COMMUNITY/bc_z_lerobot --output-repo username/test_rewards --percentage 1 -""" - -import argparse -import shutil -from pathlib import Path -from tempfile import mkdtemp - -import numpy as np -import torch -from PIL import Image -from tqdm import tqdm - -from lerobot.constants import REWARD -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata - - -def compute_linear_progress_reward(episode_length: int) -> np.ndarray: - """ - Compute linear progress rewards from 0 to 1. - - ReWiND-style: progress increases linearly from 0 at start to 1 at completion. - - Args: - episode_length: Number of frames in the episode - - Returns: - rewards: Array of shape (episode_length,) with values linearly from 0 to 1 - """ - return np.linspace(0, 1, episode_length, dtype=np.float32) - - -def main(): - parser = argparse.ArgumentParser( - description="Add linear progress rewards to LeRobot dataset and push to Hub" - ) - parser.add_argument( - "--input-repo", - type=str, - default="IPEC-COMMUNITY/bc_z_lerobot", - help="Input dataset repository on HuggingFace Hub", - ) - parser.add_argument( - "--output-repo", - type=str, - required=True, - help="Output dataset repository name (e.g., username/dataset_with_rewards)", - ) - parser.add_argument( - "--percentage", - type=float, - default=100.0, - help="Percentage of episodes to process (useful for testing, e.g., 1 for 1%%)", - ) - parser.add_argument( - "--private", - action="store_true", - help="Make the output repository private", - ) - parser.add_argument( - "--local-dir", - type=str, - default=None, - help="Local directory to save the modified dataset (defaults to ~/.cache/huggingface/lerobot/)", - ) - - args = parser.parse_args() - - print("=" * 60) - print("FULL DATASET COPY WITH REWARDS") - print("This will download the entire dataset including videos,") - print("add rewards, and push everything to a new repository.") - print("=" * 60) - - # First, load just the metadata to get total episodes - print(f"\nLoading metadata from Hub: {args.input_repo}") - - # Load metadata only first - metadata = LeRobotDatasetMetadata(repo_id=args.input_repo) - total_episodes = metadata.total_episodes - - # Calculate which episodes to process - num_episodes_to_process = max(1, int(total_episodes * args.percentage / 100)) - episodes_to_load = list(range(num_episodes_to_process)) # Load only first N episodes - - print(f"Dataset has {total_episodes} episodes") - print(f"Processing {num_episodes_to_process} episodes ({args.percentage}%)") - - # Determine local directory for the new dataset - if args.local_dir: - local_dir = Path(args.local_dir) - else: - from lerobot.constants import HF_LEROBOT_HOME - - local_dir = HF_LEROBOT_HOME / args.output_repo - - # Use a temporary directory for downloading source dataset - temp_source_dir = Path(mkdtemp(prefix="lerobot_source_")) - - # Load the dataset with videos to temp directory - print("Downloading dataset with videos to temp directory...") - print(f"Temp directory: {temp_source_dir}") - dataset = LeRobotDataset( - repo_id=args.input_repo, - root=temp_source_dir, # Temporary location for source - episodes=episodes_to_load if args.percentage < 100 else None, - download_videos=True, # Download videos - ) - - print(f"Downloaded {dataset.num_episodes} episodes with {dataset.num_frames} frames") - - # Create a new dataset with rewards - print(f"\nCreating new dataset at: {local_dir}") - - # Clean up any existing directory from previous runs - if local_dir.exists(): - print(f"⚠️ Directory already exists: {local_dir}") - print(" Removing it to start fresh...") - shutil.rmtree(local_dir) - - # Define features including reward - # Simply copy all features from the original dataset - new_features = dict(dataset.features) - - # Add reward feature - new_features[REWARD] = {"shape": (1,), "dtype": "float32", "names": ["reward"]} - - # Determine which features are videos - video_keys = dataset.meta.video_keys if hasattr(dataset.meta, "video_keys") else [] - image_keys = dataset.meta.image_keys if hasattr(dataset.meta, "image_keys") else [] - visual_keys = set(video_keys + image_keys) - - print(f" Visual features to be handled as videos: {visual_keys}") - - # Check for language features - language_keys = [ - k - for k in dataset.features.keys() - if any(lang in k.lower() for lang in ["language", "task", "instruction", "text"]) - ] - if language_keys: - print(f" Language/task features found: {language_keys}") - - # Copy dataset structure to new location - new_dataset = LeRobotDataset.create( - repo_id=args.output_repo, - root=local_dir, - fps=dataset.fps, - features=new_features, - robot_type=dataset.meta.robot_type, - use_videos=len(dataset.meta.video_keys) > 0, - ) - - # Process each episode - print("\nAdding rewards to episodes...") - - episode_data_index = dataset.episode_data_index - - for ep_idx, episode_idx in enumerate(tqdm(episodes_to_load)): - # Get episode boundaries - ep_start = episode_data_index["from"][ep_idx].item() - ep_end = episode_data_index["to"][ep_idx].item() - episode_length = ep_end - ep_start - - # Compute linear progress rewards for this episode - rewards = compute_linear_progress_reward(episode_length) - - # Get episode metadata - episode_info = dataset.meta.episodes[episode_idx] - tasks = episode_info.get("tasks", []) - if not tasks: - # Try to get task from first frame if not in episode metadata - first_frame = dataset[ep_start] - if "task" in first_frame: - tasks = [first_frame["task"]] - else: - tasks = [""] - - # Process each frame in the episode - for frame_idx in range(episode_length): - global_idx = ep_start + frame_idx - - # Get original frame data - frame_data = dataset[global_idx] - - # Create frame dict for the new dataset - frame = {} - for key in dataset.features: - # Skip only auto-generated metadata fields - # Keep task-related fields that contain language annotations - if key in ["index", "episode_index", "frame_index", "timestamp"]: - continue - - # For visual features that are videos, extract the actual frame - if key in visual_keys: - # Get the image data to save as temporary files - if key in frame_data: - img = frame_data[key] - # Convert to numpy if tensor - if isinstance(img, torch.Tensor): - img = img.cpu().numpy() - # Ensure channels-last format (H, W, C) for saving - if len(img.shape) == 3 and img.shape[0] in [1, 3, 4]: - img = np.transpose(img, (1, 2, 0)) - - # Resize to match expected shape if needed - expected_shape = new_features[key].get("shape") - if expected_shape and img.shape != tuple(expected_shape): - # Try to match the shape - handle both HWC and CHW formats - if len(expected_shape) == 3: - # Determine if expected is HWC or CHW - if expected_shape[-1] in [1, 3, 4]: # Likely HWC - target_h, target_w = expected_shape[0], expected_shape[1] - elif expected_shape[0] in [ - 1, - 3, - 4, - ]: # Likely CHW - shouldn't happen after transpose - target_h, target_w = expected_shape[1], expected_shape[2] - else: - # Assume HWC - target_h, target_w = expected_shape[0], expected_shape[1] - - # Resize using PIL for quality - if img.dtype != np.uint8: - img = (img * 255).astype(np.uint8) - pil_img = Image.fromarray(img) - pil_img = pil_img.resize((target_w, target_h), Image.Resampling.LANCZOS) - img = np.array(pil_img) - - frame[key] = img - continue - - if key in frame_data: - value = frame_data[key] - - # Handle language/task fields specially - if key == "task" and isinstance(value, str): - # Skip string task - will be passed separately to add_frame - continue - elif key == "task_index": - # Skip task_index as it will be regenerated based on task - continue - elif key in ["observation.language", "language", "instruction"] and isinstance( - value, str - ): - # Keep language fields as-is - frame[key] = value - continue - - # Regular field processing - # Convert tensors to numpy for saving - if isinstance(value, torch.Tensor): - value = value.cpu().numpy() - - # Ensure arrays are the right shape - if hasattr(value, "shape") and len(value.shape) == 0: - # Convert scalar to 1D array - value = np.array([value]) - - frame[key] = value - - # Add reward - frame[REWARD] = np.array([rewards[frame_idx]], dtype=np.float32) - - # Get task for this specific frame (might vary within episode) - if "task" in frame_data: - task = frame_data["task"] - else: - task = tasks[0] if tasks else "" - - # Add frame to new dataset - timestamp = frame_idx / dataset.fps - new_dataset.add_frame(frame, task=task, timestamp=timestamp) - - # Save the episode (this will encode videos from the saved frames) - new_dataset.save_episode() - - print( - f"\n✓ Created new dataset with rewards: {new_dataset.num_episodes} episodes, {new_dataset.num_frames} frames" - ) - - # Push to Hub - print(f"\nPushing to Hub: {args.output_repo}") - new_dataset.push_to_hub( - private=args.private, - push_videos=True, - ) - - print(f"\n✓ Dataset pushed to: https://huggingface.co/datasets/{args.output_repo}") - - # Clean up temporary source directory - if temp_source_dir.exists(): - print("\nCleaning up temporary files...") - shutil.rmtree(temp_source_dir) - - # Print summary - print("\n=== Summary ===") - print(f"Input dataset: {args.input_repo}") - print(f"Output dataset: {args.output_repo}") - print(f"Episodes processed: {num_episodes_to_process}/{total_episodes} ({args.percentage}%)") - print(f"Frames with rewards: {new_dataset.num_frames}") - print("Reward type: Linear progress (0→1)") - print("===============") - - -if __name__ == "__main__": - main() diff --git a/src/lerobot/scripts/annotate_dataset_rewards_optimized.py b/src/lerobot/scripts/annotate_dataset_rewards_optimized.py deleted file mode 100644 index 82a82c8ed..000000000 --- a/src/lerobot/scripts/annotate_dataset_rewards_optimized.py +++ /dev/null @@ -1,591 +0,0 @@ -#!/usr/bin/env python - -# Copyright 2025 The HuggingFace Inc. team. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -OPTIMIZED VERSION: Add ReWiND-style linear progress rewards to existing LeRobot datasets with parallel processing. - -This script creates a complete copy of the dataset with rewards added to each frame. -It downloads the original dataset (including videos), adds rewards, and pushes everything to a new repository. - -Key optimizations: -- Parallel episode processing using multiprocessing -- Batch frame processing within episodes -- Concurrent video encoding -- Optimized image operations -- Better memory management - -Usage: - # Test with 1% of episodes using 4 workers - python src/lerobot/scripts/annotate_dataset_rewards_optimized.py --input-repo IPEC-COMMUNITY/bc_z_lerobot --output-repo pepijn223/rewards_bc_z_1p --percentage 1 --num-workers 4 -""" - -import argparse -import logging -import shutil -from concurrent.futures import ThreadPoolExecutor, as_completed -from multiprocessing import Pool, cpu_count -from pathlib import Path -from tempfile import mkdtemp -from typing import Any - -import numpy as np -import torch -from PIL import Image -from tqdm import tqdm - -from lerobot.constants import REWARD -from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata - -# Set up logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger(__name__) - - -def compute_linear_progress_reward(episode_length: int) -> np.ndarray: - """ - Compute linear progress rewards from 0 to 1. - - ReWiND-style: progress increases linearly from 0 at start to 1 at completion. - - Args: - episode_length: Number of frames in the episode - - Returns: - rewards: Array of shape (episode_length,) with values linearly from 0 to 1 - """ - return np.linspace(0, 1, episode_length, dtype=np.float32) - - -def process_image_batch(images: list[np.ndarray], target_shape: tuple[int, ...]) -> list[np.ndarray]: - """ - Process a batch of images efficiently. - - Args: - images: List of numpy arrays representing images - target_shape: Target shape for resizing - - Returns: - List of processed images - """ - processed = [] - - if len(target_shape) == 3: - # Determine target dimensions - if target_shape[-1] in [1, 3, 4]: # Likely HWC - target_h, target_w = target_shape[0], target_shape[1] - elif target_shape[0] in [1, 3, 4]: # Likely CHW - target_h, target_w = target_shape[1], target_shape[2] - else: - target_h, target_w = target_shape[0], target_shape[1] - - # Process all images - for img in images: - # Ensure channels-last format - if len(img.shape) == 3 and img.shape[0] in [1, 3, 4]: - img = np.transpose(img, (1, 2, 0)) - - # Resize if needed - if img.shape[:2] != (target_h, target_w): - if img.dtype != np.uint8: - img = (img * 255).astype(np.uint8) - pil_img = Image.fromarray(img) - pil_img = pil_img.resize((target_w, target_h), Image.Resampling.LANCZOS) - img = np.array(pil_img) - - processed.append(img) - else: - processed = images - - return processed - - -def process_episode_chunk(args: tuple[int, int, dict, Any]) -> tuple[int, list[dict], list[str]]: - """ - Process a chunk of frames from an episode in parallel. - - Args: - args: Tuple of (chunk_start, chunk_end, shared_data, episode_data) - - Returns: - Tuple of (episode_idx, frames_data, tasks) - """ - chunk_start, chunk_end, shared_data, episode_data = args - - episode_idx = episode_data["episode_idx"] - ep_start = episode_data["ep_start"] - episode_length = episode_data["episode_length"] - rewards = episode_data["rewards"] - tasks_default = episode_data["tasks"] - dataset = episode_data["dataset"] - new_features = shared_data["new_features"] - visual_keys = shared_data["visual_keys"] - fps = shared_data["fps"] - - frames_data = [] - tasks = [] - - # Process chunk of frames - for frame_idx in range(chunk_start, min(chunk_end, episode_length)): - global_idx = ep_start + frame_idx - - # Get original frame data - frame_data = dataset[global_idx] - - # Create frame dict for the new dataset - frame = {} - - # Process all non-visual features - for key in dataset.features: - if key in ["index", "episode_index", "frame_index", "timestamp"]: - continue - - if key in visual_keys: - # Process visual features - if key in frame_data: - img = frame_data[key] - if isinstance(img, torch.Tensor): - img = img.cpu().numpy() - frame[key] = img - continue - - if key in frame_data: - value = frame_data[key] - - # Handle special fields - if key == "task" and isinstance(value, str): - tasks.append(value) - continue - elif key == "task_index": - continue - elif key in ["observation.language", "language", "instruction"] and isinstance(value, str): - frame[key] = value - continue - - # Regular field processing - if isinstance(value, torch.Tensor): - value = value.cpu().numpy() - - if hasattr(value, "shape") and len(value.shape) == 0: - value = np.array([value]) - - frame[key] = value - - # Add reward - frame[REWARD] = np.array([rewards[frame_idx]], dtype=np.float32) - - # Set task - if not tasks or tasks[-1] is None: - tasks.append(tasks_default[0] if tasks_default else "") - - # Add timestamp - frame["timestamp"] = frame_idx / fps - - frames_data.append(frame) - - return (episode_idx, frames_data, tasks) - - -def process_episode_parallel( - episode_data: dict, shared_data: dict, chunk_size: int = 50 -) -> tuple[int, list[dict], list[str]]: - """ - Process an entire episode using parallel chunk processing. - - Args: - episode_data: Episode-specific data - shared_data: Shared configuration data - chunk_size: Number of frames to process per chunk - - Returns: - Tuple of (episode_idx, all_frames, all_tasks) - """ - episode_length = episode_data["episode_length"] - episode_idx = episode_data["episode_idx"] - - # Create chunks - chunks = [] - for i in range(0, episode_length, chunk_size): - chunk_end = min(i + chunk_size, episode_length) - chunks.append((i, chunk_end, shared_data, episode_data)) - - # Process chunks in parallel using threads (good for I/O bound operations) - all_frames = [None] * episode_length - all_tasks = [] - - with ThreadPoolExecutor(max_workers=4) as executor: - futures = {executor.submit(process_episode_chunk, chunk): idx for idx, chunk in enumerate(chunks)} - - for future in as_completed(futures): - chunk_idx = futures[future] - _, frames, tasks = future.result() - - # Place frames in correct positions - start_idx = chunks[chunk_idx][0] - for i, frame in enumerate(frames): - all_frames[start_idx + i] = frame - all_tasks.extend(tasks) - - # Filter out None values (shouldn't happen but safety check) - all_frames = [f for f in all_frames if f is not None] - - return (episode_idx, all_frames, all_tasks) - - -def worker_process_episode(args: tuple[int, str, str, dict, str, str, bool]) -> dict: - """ - Worker function to process a single episode. - - Args: - args: Tuple containing (episode_idx, input_repo, output_repo, shared_data, local_dir, temp_dir, use_chunk_processing) - - Returns: - Dict with processing results or error - """ - episode_idx, input_repo, output_repo, shared_data, local_dir_str, temp_dir, use_chunk_processing = args - - try: - local_dir = Path(local_dir_str) - - # Load dataset for this worker - dataset = LeRobotDataset( - repo_id=input_repo, - root=Path(temp_dir), - episodes=[episode_idx], - download_videos=True, - ) - - # Get episode boundaries - episode_data_index = dataset.episode_data_index - ep_start = episode_data_index["from"][0].item() - ep_end = episode_data_index["to"][0].item() - episode_length = ep_end - ep_start - - # Compute rewards - rewards = compute_linear_progress_reward(episode_length) - - # Get episode metadata - episode_info = dataset.meta.episodes[episode_idx] - tasks = episode_info.get("tasks", []) - if not tasks: - first_frame = dataset[ep_start] - if "task" in first_frame: - tasks = [first_frame["task"]] - else: - tasks = [""] - - # Prepare episode data - episode_data = { - "episode_idx": episode_idx, - "ep_start": ep_start, - "episode_length": episode_length, - "rewards": rewards, - "tasks": tasks, - "dataset": dataset, - } - - if use_chunk_processing: - # Process episode with chunk parallelization - _, frames_data, frame_tasks = process_episode_parallel(episode_data, shared_data) - else: - # Process episode sequentially (fallback) - frames_data = [] - frame_tasks = [] - - for frame_idx in range(episode_length): - global_idx = ep_start + frame_idx - frame_data = dataset[global_idx] - - frame = {} - for key in dataset.features: - if key in ["index", "episode_index", "frame_index", "timestamp"]: - continue - - if key in shared_data["visual_keys"]: - if key in frame_data: - img = frame_data[key] - if isinstance(img, torch.Tensor): - img = img.cpu().numpy() - - # Process image if needed - if ( - key in shared_data["new_features"] - and "shape" in shared_data["new_features"][key] - ): - expected_shape = shared_data["new_features"][key]["shape"] - img = process_image_batch([img], expected_shape)[0] - - frame[key] = img - continue - - if key in frame_data: - value = frame_data[key] - - if key == "task" and isinstance(value, str): - frame_tasks.append(value) - continue - elif key == "task_index": - continue - - if isinstance(value, torch.Tensor): - value = value.cpu().numpy() - - if hasattr(value, "shape") and len(value.shape) == 0: - value = np.array([value]) - - frame[key] = value - - frame[REWARD] = np.array([rewards[frame_idx]], dtype=np.float32) - frames_data.append(frame) - - if not frame_tasks or len(frame_tasks) <= frame_idx: - frame_tasks.append(tasks[0] if tasks else "") - - return { - "episode_idx": episode_idx, - "frames_data": frames_data, - "tasks": frame_tasks if frame_tasks else tasks, - "fps": dataset.fps, - "success": True, - } - - except Exception as e: - logger.error(f"Error processing episode {episode_idx}: {e}") - return {"episode_idx": episode_idx, "error": str(e), "success": False} - - -def main(): - parser = argparse.ArgumentParser( - description="Optimized: Add linear progress rewards to LeRobot dataset with parallel processing" - ) - parser.add_argument( - "--input-repo", - type=str, - default="IPEC-COMMUNITY/bc_z_lerobot", - help="Input dataset repository on HuggingFace Hub", - ) - parser.add_argument( - "--output-repo", - type=str, - required=True, - help="Output dataset repository name (e.g., username/dataset_with_rewards)", - ) - parser.add_argument( - "--percentage", - type=float, - default=100.0, - help="Percentage of episodes to process (useful for testing, e.g., 1 for 1%%)", - ) - parser.add_argument( - "--num-workers", - type=int, - default=None, - help="Number of parallel workers (defaults to CPU count - 2)", - ) - parser.add_argument( - "--chunk-size", - type=int, - default=50, - help="Number of frames to process per chunk within an episode", - ) - parser.add_argument( - "--private", - action="store_true", - help="Make the output repository private", - ) - parser.add_argument( - "--local-dir", - type=str, - default=None, - help="Local directory to save the modified dataset", - ) - parser.add_argument( - "--no-chunk-processing", - action="store_true", - help="Disable chunk-based parallel processing within episodes", - ) - - args = parser.parse_args() - - # Determine number of workers - if args.num_workers is None: - args.num_workers = max(1, cpu_count() - 2) - - print("=" * 60) - print("OPTIMIZED DATASET COPY WITH REWARDS") - print(f"Using {args.num_workers} parallel workers") - print("=" * 60) - - # Load metadata - print(f"\nLoading metadata from Hub: {args.input_repo}") - metadata = LeRobotDatasetMetadata(repo_id=args.input_repo) - total_episodes = metadata.total_episodes - - # Calculate episodes to process - num_episodes_to_process = max(1, int(total_episodes * args.percentage / 100)) - episodes_to_load = list(range(num_episodes_to_process)) - - print(f"Dataset has {total_episodes} episodes") - print(f"Processing {num_episodes_to_process} episodes ({args.percentage}%)") - - # Determine local directory - if args.local_dir: - local_dir = Path(args.local_dir) - else: - from lerobot.constants import HF_LEROBOT_HOME - - local_dir = HF_LEROBOT_HOME / args.output_repo - - # Create temporary directories for workers - temp_base_dir = Path(mkdtemp(prefix="lerobot_parallel_")) - worker_temp_dirs = [] - for i in range(args.num_workers): - worker_dir = temp_base_dir / f"worker_{i}" - worker_dir.mkdir(parents=True, exist_ok=True) - worker_temp_dirs.append(str(worker_dir)) - - print(f"Using temporary base directory: {temp_base_dir}") - - # Load first episode to get features and structure - print("\nLoading dataset structure...") - sample_dataset = LeRobotDataset( - repo_id=args.input_repo, - root=temp_base_dir / "sample", - episodes=[0], - download_videos=True, - ) - - # Prepare features with reward - new_features = dict(sample_dataset.features) - new_features[REWARD] = {"shape": (1,), "dtype": "float32", "names": ["reward"]} - - # Determine visual keys - video_keys = sample_dataset.meta.video_keys if hasattr(sample_dataset.meta, "video_keys") else [] - image_keys = sample_dataset.meta.image_keys if hasattr(sample_dataset.meta, "image_keys") else [] - visual_keys = set(video_keys + image_keys) - - print(f" Visual features: {visual_keys}") - - # Clean up existing directory - if local_dir.exists(): - print(f"⚠️ Directory already exists: {local_dir}") - print(" Removing it to start fresh...") - shutil.rmtree(local_dir) - - # Create new dataset structure - print("\nCreating new dataset structure...") - new_dataset = LeRobotDataset.create( - repo_id=args.output_repo, - root=local_dir, - fps=sample_dataset.fps, - features=new_features, - robot_type=sample_dataset.meta.robot_type, - use_videos=len(sample_dataset.meta.video_keys) > 0, - ) - - # Prepare shared data for workers - shared_data = { - "new_features": new_features, - "visual_keys": visual_keys, - "fps": sample_dataset.fps, - } - - # Process episodes in parallel - print(f"\nProcessing {num_episodes_to_process} episodes with {args.num_workers} workers...") - - # Prepare worker arguments - worker_args = [] - for i, episode_idx in enumerate(episodes_to_load): - # Assign worker temp directory round-robin - temp_dir = worker_temp_dirs[i % args.num_workers] - worker_args.append( - ( - episode_idx, - args.input_repo, - args.output_repo, - shared_data, - str(local_dir), - temp_dir, - not args.no_chunk_processing, - ) - ) - - # Process episodes using multiprocessing - processed_episodes = {} - failed_episodes = [] - - with Pool(processes=args.num_workers) as pool: - # Use imap_unordered for better progress tracking - with tqdm(total=num_episodes_to_process, desc="Processing episodes") as pbar: - for result in pool.imap_unordered(worker_process_episode, worker_args): - pbar.update(1) - - if result["success"]: - processed_episodes[result["episode_idx"]] = result - else: - failed_episodes.append(result["episode_idx"]) - logger.error( - f"Failed episode {result['episode_idx']}: {result.get('error', 'Unknown error')}" - ) - - # Add processed episodes to the new dataset in order - print("\nSaving processed episodes to new dataset...") - for episode_idx in tqdm(episodes_to_load, desc="Saving episodes"): - if episode_idx in processed_episodes: - result = processed_episodes[episode_idx] - - # Add all frames for this episode - for i, frame_data in enumerate(result["frames_data"]): - task = result["tasks"][i] if i < len(result["tasks"]) else result["tasks"][0] - timestamp = i / result["fps"] - new_dataset.add_frame(frame_data, task=task, timestamp=timestamp) - - # Save the episode - new_dataset.save_episode() - - print( - f"\n✓ Created new dataset with rewards: {new_dataset.num_episodes} episodes, {new_dataset.num_frames} frames" - ) - - if failed_episodes: - print(f"⚠️ Failed to process {len(failed_episodes)} episodes: {failed_episodes}") - - # Push to Hub - print(f"\nPushing to Hub: {args.output_repo}") - new_dataset.push_to_hub( - private=args.private, - push_videos=True, - ) - - print(f"\n✓ Dataset pushed to: https://huggingface.co/datasets/{args.output_repo}") - - # Clean up temporary directories - if temp_base_dir.exists(): - print("\nCleaning up temporary files...") - shutil.rmtree(temp_base_dir) - - # Print summary - print("\n=== Summary ===") - print(f"Input dataset: {args.input_repo}") - print(f"Output dataset: {args.output_repo}") - print(f"Episodes processed: {num_episodes_to_process - len(failed_episodes)}/{total_episodes}") - print(f"Frames with rewards: {new_dataset.num_frames}") - print(f"Parallel workers used: {args.num_workers}") - print(f"Processing time saved: ~{args.num_workers - 1}x faster") - print("===============") - - -if __name__ == "__main__": - main() diff --git a/src/lerobot/scripts/train.py b/src/lerobot/scripts/train.py index ed4d90e79..26e531ba5 100644 --- a/src/lerobot/scripts/train.py +++ b/src/lerobot/scripts/train.py @@ -67,10 +67,18 @@ def update_policy( start_time = time.perf_counter() device = get_device_from_parameters(policy) policy.train() + + # Forward pass timing + forward_start = time.perf_counter() with torch.autocast(device_type=device.type) if use_amp else nullcontext(): loss, output_dict = policy.forward(batch) # TODO(rcadene): policy.unnormalize_outputs(out_dict) + forward_time = time.perf_counter() - forward_start + + # Backward pass timing + backward_start = time.perf_counter() grad_scaler.scale(loss).backward() + backward_time = time.perf_counter() - backward_start # Unscale the gradient of the optimizer's assigned params in-place **prior to gradient clipping**. grad_scaler.unscale_(optimizer) @@ -81,6 +89,9 @@ def update_policy( error_if_nonfinite=False, ) + # Optimizer step timing + optim_start = time.perf_counter() + # Optimizer's gradients are already unscaled, so scaler.step does not unscale them, # although it still skips optimizer.step() if the gradients contain infs or NaNs. with lock if lock is not None else nullcontext(): @@ -97,6 +108,19 @@ def update_policy( if has_method(policy, "update"): # To possibly update an internal buffer (for instance an Exponential Moving Average like in TDMPC). policy.update() + + optim_time = time.perf_counter() - optim_start + total_time = time.perf_counter() - start_time + + # Print detailed timing for RLearN policy + if getattr(policy, "name", None) == "rlearn": + print(f"Training Step Timing:") + print(f" Forward pass: {forward_time*1000:.2f} ms") + print(f" Backward pass: {backward_time*1000:.2f} ms") + print(f" Optimizer step: {optim_time*1000:.2f} ms") + print(f" Total update: {total_time*1000:.2f} ms") + print(f" Steps/sec: {1.0/total_time:.2f}") + print("-" * 40) train_metrics.loss = loss.item() train_metrics.grad_norm = grad_norm.item() @@ -213,10 +237,17 @@ def train(cfg: TrainPipelineConfig): logging.info("Start offline training on a fixed dataset") for _ in range(step, cfg.steps): - start_time = time.perf_counter() + # Data loading timing + data_start = time.perf_counter() batch = next(dl_iter) + data_loading_time = time.perf_counter() - data_start + + # Preprocessing timing + preprocess_start = time.perf_counter() batch = preprocessor(batch) - train_tracker.dataloading_s = time.perf_counter() - start_time + preprocess_time = time.perf_counter() - preprocess_start + + train_tracker.dataloading_s = data_loading_time + preprocess_time for key in batch: if isinstance(batch[key], torch.Tensor): @@ -256,13 +287,22 @@ def train(cfg: TrainPipelineConfig): total_pixels += sum(_count_pixels(t) for t in v) # Avoid div-by-zero - upd_s = max(train_tracker.update_s, 1e-8) + meter = train_tracker.update_s + upd_s = meter.val if isinstance(meter, AverageMeter) else float(meter) + upd_s = max(upd_s, 1e-8) pix_per_s = float(total_pixels) / upd_s try: train_tracker.pix_s = pix_per_s except Exception: pass + # Print data loading timing for RLearN + if getattr(policy, "name", None) == "rlearn": + print(f"Data Pipeline Timing:") + print(f" Data loading: {data_loading_time*1000:.2f} ms") + print(f" Preprocessing: {preprocess_time*1000:.2f} ms") + print(f" Total data pipeline: {(data_loading_time + preprocess_time)*1000:.2f} ms") + # Note: eval and checkpoint happens *after* the `step`th training update has completed, so we # increment `step` here. step += 1