diff --git a/src/lerobot/datasets/aggregate.py b/src/lerobot/datasets/aggregate.py index 870c9571e..455caf0fe 100644 --- a/src/lerobot/datasets/aggregate.py +++ b/src/lerobot/datasets/aggregate.py @@ -136,21 +136,40 @@ def update_meta_data( df["_orig_chunk"] = df[orig_chunk_col].copy() df["_orig_file"] = df[orig_file_col].copy() - # Update chunk and file indices to point to destination - df[orig_chunk_col] = video_idx["chunk"] - df[orig_file_col] = video_idx["file"] - - # Apply per-source-file timestamp offsets + # Get mappings for this video key src_to_offset = video_idx.get("src_to_offset", {}) - if src_to_offset: - # Apply offset based on original source file + src_to_dst = video_idx.get("src_to_dst", {}) + + # Apply per-source-file mappings + if src_to_dst: + # Map each episode to its correct destination file and apply offset for idx in df.index: - src_key = (df.at[idx, "_orig_chunk"], df.at[idx, "_orig_file"]) + # Convert to Python int to avoid numpy type mismatch in dict lookup + src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"])) + + # Get destination chunk/file for this source file + dst_chunk, dst_file = src_to_dst.get(src_key, (video_idx["chunk"], video_idx["file"])) + df.at[idx, orig_chunk_col] = dst_chunk + df.at[idx, orig_file_col] = dst_file + + # Apply timestamp offset + offset = src_to_offset.get(src_key, 0) + df.at[idx, f"videos/{key}/from_timestamp"] += offset + df.at[idx, f"videos/{key}/to_timestamp"] += offset + elif src_to_offset: + # Fallback: use same destination for all, but apply per-file offsets + df[orig_chunk_col] = video_idx["chunk"] + df[orig_file_col] = video_idx["file"] + for idx in df.index: + # Convert to Python int to avoid numpy type mismatch in dict lookup + src_key = (int(df.at[idx, "_orig_chunk"]), int(df.at[idx, "_orig_file"])) offset = src_to_offset.get(src_key, 0) df.at[idx, f"videos/{key}/from_timestamp"] += offset df.at[idx, f"videos/{key}/to_timestamp"] += offset else: # Fallback to simple offset (for backward compatibility) + df[orig_chunk_col] = video_idx["chunk"] + df[orig_file_col] = video_idx["file"] df[f"videos/{key}/from_timestamp"] = ( df[f"videos/{key}/from_timestamp"] + video_idx["latest_duration"] ) @@ -268,6 +287,12 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu videos_idx[key]["episode_duration"] = 0 # Track offset for each source (chunk, file) pair videos_idx[key]["src_to_offset"] = {} + # Track destination (chunk, file) for each source (chunk, file) pair + videos_idx[key]["src_to_dst"] = {} + # Initialize dst_file_durations if not present + # dst_file_durations tracks duration of each destination file + if "dst_file_durations" not in videos_idx[key]: + videos_idx[key]["dst_file_durations"] = {} for key, video_idx in videos_idx.items(): unique_chunk_file_pairs = { @@ -282,9 +307,13 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu chunk_idx = video_idx["chunk"] file_idx = video_idx["file"] - current_offset = video_idx["latest_duration"] + dst_file_durations = video_idx["dst_file_durations"] for src_chunk_idx, src_file_idx in unique_chunk_file_pairs: + # Convert to Python int to ensure consistent dict keys + src_chunk_idx = int(src_chunk_idx) + src_file_idx = int(src_file_idx) + src_path = src_meta.root / DEFAULT_VIDEO_PATH.format( video_key=key, chunk_index=src_chunk_idx, @@ -298,14 +327,17 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu ) src_duration = get_video_duration_in_s(src_path) + dst_key = (chunk_idx, file_idx) if not dst_path.exists(): - # Store offset before incrementing - videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset + # New destination file: offset is 0 + videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0 + videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key dst_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(str(src_path), str(dst_path)) + # Track duration of this destination file + dst_file_durations[dst_key] = src_duration videos_idx[key]["episode_duration"] += src_duration - current_offset += src_duration continue # Check file sizes before appending @@ -313,10 +345,11 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu dst_size = get_file_size_in_mb(dst_path) if dst_size + src_size >= video_files_size_in_mb: - # Rotate to a new file, this source becomes start of new destination - # So its offset should be 0 - videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0 + # Rotate to a new file - offset is 0 chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, chunk_size) + dst_key = (chunk_idx, file_idx) + videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = 0 + videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key dst_path = dst_meta.root / DEFAULT_VIDEO_PATH.format( video_key=key, chunk_index=chunk_idx, @@ -324,16 +357,20 @@ def aggregate_videos(src_meta, dst_meta, videos_idx, video_files_size_in_mb, chu ) dst_path.parent.mkdir(parents=True, exist_ok=True) shutil.copy(str(src_path), str(dst_path)) - # Reset offset for next file - current_offset = src_duration + # Track duration of this new destination file + dst_file_durations[dst_key] = src_duration else: - # Append to existing video file - use current accumulated offset - videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_offset + # Append to existing destination file + # Offset is the current duration of this destination file + current_dst_duration = dst_file_durations.get(dst_key, 0) + videos_idx[key]["src_to_offset"][(src_chunk_idx, src_file_idx)] = current_dst_duration + videos_idx[key]["src_to_dst"][(src_chunk_idx, src_file_idx)] = dst_key concatenate_video_files( [dst_path, src_path], dst_path, ) - current_offset += src_duration + # Update duration of this destination file + dst_file_durations[dst_key] = current_dst_duration + src_duration videos_idx[key]["episode_duration"] += src_duration