diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 9253202b3..3611403ed 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -699,6 +699,7 @@ class LeRobotDataset(torch.utils.data.Dataset): self.writer = None self.latest_episode = None self._current_file_start_frame = None # Track the starting frame index of the current parquet file + self._pending_video_encoding = None # Track pending async video encoding self.root.mkdir(exist_ok=True, parents=True) @@ -1073,6 +1074,15 @@ class LeRobotDataset(torch.utils.data.Dataset): collection/conversion, else footer metadata won't be written to the parquet files. The dataset won't be valid and can't be loaded as ds = LeRobotDataset(repo_id=repo, root=HF_LEROBOT_HOME.joinpath(repo)) """ + # Finalize any pending video encoding from the last episode + if hasattr(self, '_pending_video_encoding') and self._pending_video_encoding: + prev_ep_idx, prev_temp_paths, prev_video_keys = self._pending_video_encoding + logging.info(f"Finalizing last episode {prev_ep_idx} videos...") + self._wait_video_encoder() + for vkey, vpath in zip(prev_video_keys, prev_temp_paths): + self._save_episode_video(vkey, prev_ep_idx, vpath) + self._pending_video_encoding = None + self._wait_video_encoder() self.stop_video_encoder() self._close_writer() @@ -1216,7 +1226,17 @@ class LeRobotDataset(torch.utils.data.Dataset): if has_video_keys and not use_batched_encoding: if use_async_encoding: - # Submit encoding tasks to background workers (parallel encoding) + # Pipeline: finalize PREVIOUS episode's encoding while current is submitted + # This allows encoding to happen during the NEXT episode's recording + if hasattr(self, '_pending_video_encoding') and self._pending_video_encoding: + prev_ep_idx, prev_temp_paths, prev_video_keys = self._pending_video_encoding + logging.info(f"Finalizing previous episode {prev_ep_idx} videos...") + self.video_encoder.wait_until_done() + for vkey, vpath in zip(prev_video_keys, prev_temp_paths): + self._save_episode_video(vkey, prev_ep_idx, vpath) + self._pending_video_encoding = None + + # Submit current episode encoding (will run during next episode recording) temp_paths = [] for video_key in self.meta.video_keys: img_dir = self._get_image_file_dir(episode_index, video_key) @@ -1229,10 +1249,9 @@ class LeRobotDataset(torch.utils.data.Dataset): episode_index=episode_index, video_key=video_key, ) - # Wait for encoding to complete, then move to proper location - self.video_encoder.wait_until_done() - for video_key, video_path in zip(self.meta.video_keys, temp_paths): - ep_metadata.update(self._save_episode_video(video_key, episode_index, video_path)) + # Store for finalization during next episode + self._pending_video_encoding = (episode_index, temp_paths, list(self.meta.video_keys)) + logging.info(f"Episode {episode_index} videos queued for encoding (will finalize next episode)") else: video_paths = self._encode_multiple_temporary_episode_videos(self.meta.video_keys, episode_index) for video_key, video_path in zip(self.meta.video_keys, video_paths): @@ -1633,6 +1652,7 @@ class LeRobotDataset(torch.utils.data.Dataset): obj.writer = None obj.latest_episode = None obj._current_file_start_frame = None + obj._pending_video_encoding = None # Initialize tracking for incremental recording obj._lazy_loading = False obj._recorded_frames = 0