From 1ea65730ac2cbca6e5869df734fbd4392561b3c6 Mon Sep 17 00:00:00 2001 From: Pepijn Date: Wed, 24 Sep 2025 11:55:24 +0200 Subject: [PATCH] fix --- src/lerobot/optim/schedulers.py | 15 ++-- src/lerobot/scripts/train.py | 120 +++++++++++++++++++++++++++----- 2 files changed, 111 insertions(+), 24 deletions(-) diff --git a/src/lerobot/optim/schedulers.py b/src/lerobot/optim/schedulers.py index d08018175..d2525dbfe 100644 --- a/src/lerobot/optim/schedulers.py +++ b/src/lerobot/optim/schedulers.py @@ -92,13 +92,18 @@ class CosineDecayWithWarmupSchedulerConfig(LRSchedulerConfig): def lr_lambda(current_step): def linear_warmup_schedule(current_step): if current_step <= 0: - return 1 / (self.num_warmup_steps + 1) - frac = 1 - current_step / self.num_warmup_steps - return (1 / (self.num_warmup_steps + 1) - 1) * frac + 1 + return 0.1 # Start at 10% instead of 0.1% of peak LR + if current_step >= self.num_warmup_steps: + return 1.0 # Reach peak at end of warmup + # Linear interpolation from 10% to 100% of peak LR + return 0.1 + 0.9 * (current_step / self.num_warmup_steps) def cosine_decay_schedule(current_step): - step = min(current_step, self.num_decay_steps) - cosine_decay = 0.5 * (1 + math.cos(math.pi * step / self.num_decay_steps)) + # CRITICAL FIX: Decay should count from END of warmup, not from step 0! + decay_step = current_step - self.num_warmup_steps + decay_step = max(0, min(decay_step, self.num_decay_steps)) + + cosine_decay = 0.5 * (1 + math.cos(math.pi * decay_step / self.num_decay_steps)) alpha = self.decay_lr / self.peak_lr decayed = (1 - alpha) * cosine_decay + alpha return decayed diff --git a/src/lerobot/scripts/train.py b/src/lerobot/scripts/train.py index 21da62bbb..ae1a7c501 100644 --- a/src/lerobot/scripts/train.py +++ b/src/lerobot/scripts/train.py @@ -20,6 +20,8 @@ from pprint import pformat from typing import Any import torch +from accelerate import Accelerator +from accelerate.utils import DistributedDataParallelKwargs from termcolor import colored from torch.amp import GradScaler from torch.optim import Optimizer @@ -147,17 +149,40 @@ def train(cfg: TrainPipelineConfig): cfg.validate() logging.info(pformat(cfg.to_dict())) + # Initialize Accelerate if requested + accelerator = None + if cfg.use_accelerate: + # Configure DDP to handle unused parameters + ddp_kwargs = DistributedDataParallelKwargs(find_unused_parameters=True) + accelerator = Accelerator( + gradient_accumulation_steps=cfg.gradient_accumulation_steps, + mixed_precision=cfg.mixed_precision, + kwargs_handlers=[ddp_kwargs], + ) + device = accelerator.device + if accelerator.is_main_process: + logging.info( + f"Accelerate initialized with device: {device}, mixed_precision: {cfg.mixed_precision}" + ) + logging.info(f"Training on {accelerator.num_processes} processes") + else: + # Check device is available (original behavior) + device = get_safe_torch_device(cfg.policy.device, log=True) + + # Only create wandb logger on main process when using accelerate if cfg.wandb.enable and cfg.wandb.project: - wandb_logger = WandBLogger(cfg) + if accelerator is None or accelerator.is_main_process: + wandb_logger = WandBLogger(cfg) + else: + wandb_logger = None else: wandb_logger = None - logging.info(colored("Logs will be saved locally.", "yellow", attrs=["bold"])) + if accelerator is None or accelerator.is_main_process: + logging.info(colored("Logs will be saved locally.", "yellow", attrs=["bold"])) if cfg.seed is not None: set_seed(cfg.seed) - # Check device is available - device = get_safe_torch_device(cfg.policy.device, log=True) torch.backends.cudnn.benchmark = True torch.backends.cuda.matmul.allow_tf32 = True @@ -200,6 +225,12 @@ def train(cfg: TrainPipelineConfig): if cfg.resume: step, optimizer, lr_scheduler = load_training_state(cfg.checkpoint_path, optimizer, lr_scheduler) + # Prepare objects with Accelerate if enabled + if accelerator is not None: + policy, optimizer, lr_scheduler = accelerator.prepare(policy, optimizer, lr_scheduler) + if accelerator.is_main_process: + logging.info("Policy, optimizer, and scheduler prepared with Accelerate") + num_learnable_params = sum(p.numel() for p in policy.parameters() if p.requires_grad) num_total_params = sum(p.numel() for p in policy.parameters()) @@ -235,6 +266,11 @@ def train(cfg: TrainPipelineConfig): drop_last=False, prefetch_factor=2, ) + + # Prepare dataloader with Accelerate if enabled + if accelerator is not None: + dataloader = accelerator.prepare(dataloader) + dl_iter = cycle(dataloader) policy.train() @@ -252,22 +288,68 @@ def train(cfg: TrainPipelineConfig): ) logging.info("Start offline training on a fixed dataset") - for _ in range(step, cfg.steps): - start_time = time.perf_counter() - batch = next(dl_iter) - batch = preprocessor(batch) - train_tracker.dataloading_s = time.perf_counter() - start_time - train_tracker, output_dict = update_policy( - train_tracker, - policy, - batch, - optimizer, - cfg.optimizer.grad_clip_norm, - grad_scaler=grad_scaler, - lr_scheduler=lr_scheduler, - use_amp=cfg.policy.use_amp, - ) + # Calculate gradient accumulation steps for multi-GPU training + # This ensures effective batch size matches single-GPU training + gradient_accumulation_steps = accelerator.num_processes if accelerator is not None else 1 + if accelerator and accelerator.is_main_process: + logging.info(f"Using gradient accumulation: {gradient_accumulation_steps} steps") + logging.info(f"Effective batch size: {cfg.batch_size} (same as single-GPU)") + + for _ in range(step, cfg.steps): + policy.train() + optimizer.zero_grad() + + # Accumulate gradients over multiple mini-batches to match single-GPU effective batch size + accumulated_loss = 0 + accumulated_output_dict = {} + + for accum_step in range(gradient_accumulation_steps): + start_time = time.perf_counter() + batch = next(dl_iter) + batch = preprocessor(batch) + if accum_step == 0: # Only track data loading time once per step + train_tracker.dataloading_s = time.perf_counter() - start_time + + # Forward pass + start_time = time.perf_counter() + with accelerator.autocast() if accelerator else nullcontext(): + loss, output_dict = policy.forward(batch) + # Scale loss by accumulation steps to get proper average + loss = loss / gradient_accumulation_steps + + # Backward pass + if accelerator: + accelerator.backward(loss) + else: + grad_scaler.scale(loss).backward() + + # Accumulate metrics + accumulated_loss += loss.item() + if accum_step == 0: + accumulated_output_dict = output_dict + + # Gradient clipping and optimizer step + if accelerator: + accelerator.clip_grad_norm_(policy.parameters(), cfg.optimizer.grad_clip_norm) + optimizer.step() + else: + grad_scaler.unscale_(optimizer) + _ = torch.nn.utils.clip_grad_norm_( + policy.parameters(), cfg.optimizer.grad_clip_norm, error_if_nonfinite=False + ) + grad_scaler.step(optimizer) + grad_scaler.update() + + # Update learning rate scheduler + if lr_scheduler is not None: + lr_scheduler.step() + + # Update metrics with accumulated values + train_tracker.loss = accumulated_loss + train_tracker.lr = optimizer.param_groups[0]["lr"] + train_tracker.update_s = time.perf_counter() - start_time + output_dict = accumulated_output_dict # Note: eval and checkpoint happens *after* the `step`th training update has completed, so we # increment `step` here.