diff --git a/src/lerobot/datasets/lerobot_dataset.py b/src/lerobot/datasets/lerobot_dataset.py index 9c94235c9..fbfb82c6d 100644 --- a/src/lerobot/datasets/lerobot_dataset.py +++ b/src/lerobot/datasets/lerobot_dataset.py @@ -13,8 +13,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures import contextlib import logging +import platform import shutil import tempfile from collections.abc import Callable @@ -539,6 +541,15 @@ class LeRobotDatasetMetadata: return obj +def _encode_video_worker(video_key: str, episode_index: int, root: Path, fps: int) -> Path: + temp_path = Path(tempfile.mkdtemp(dir=root)) / f"{video_key}_{episode_index:03d}.mp4" + fpath = DEFAULT_IMAGE_PATH.format(image_key=video_key, episode_index=episode_index, frame_index=0) + img_dir = (root / fpath).parent + encode_video_frames(img_dir, temp_path, fps, overwrite=True) + shutil.rmtree(img_dir) + return temp_path + + class LeRobotDataset(torch.utils.data.Dataset): def __init__( self, @@ -1071,6 +1082,7 @@ class LeRobotDataset(torch.utils.data.Dataset): ep_buffer[key] = current_ep_idx if key == "episode_index" else [] return ep_buffer + # TODO(Steven): consider move this to utils def _get_image_file_path(self, episode_index: int, image_key: str, frame_index: int) -> Path: fpath = DEFAULT_IMAGE_PATH.format( image_key=image_key, episode_index=episode_index, frame_index=frame_index @@ -1131,7 +1143,11 @@ class LeRobotDataset(torch.utils.data.Dataset): self.episode_buffer["size"] += 1 - def save_episode(self, episode_data: dict | None = None) -> None: + def save_episode( + self, + episode_data: dict | None = None, + parallel_encoding: bool = platform.system() == "Linux", + ) -> None: """ This will save to disk the current episode in self.episode_buffer. @@ -1143,6 +1159,8 @@ class LeRobotDataset(torch.utils.data.Dataset): episode_data (dict | None, optional): Dict containing the episode data to save. If None, this will save the current episode in self.episode_buffer, which is filled with 'add_frame'. Defaults to None. + parallel_encoding (bool, optional): If True, encode videos in parallel using ProcessPoolExecutor. + Defaults to True on Linux, False on macOS as it tends to use all the CPU available already. """ episode_buffer = episode_data if episode_data is not None else self.episode_buffer @@ -1179,8 +1197,40 @@ class LeRobotDataset(torch.utils.data.Dataset): use_batched_encoding = self.batch_encoding_size > 1 if has_video_keys and not use_batched_encoding: - for video_key in self.meta.video_keys: - ep_metadata.update(self._save_episode_video(video_key, episode_index)) + num_cameras = len(self.meta.video_keys) + if parallel_encoding and num_cameras > 1: + # TODO(Steven): Ideally we would like to control the number of threads per encoding such that: + # num_cameras * num_threads = (total_cpu -1) + with concurrent.futures.ProcessPoolExecutor(max_workers=num_cameras) as executor: + future_to_key = { + executor.submit( + _encode_video_worker, + video_key, + episode_index, + self.root, + self.fps, + ): video_key + for video_key in self.meta.video_keys + } + + results = {} + for future in concurrent.futures.as_completed(future_to_key): + video_key = future_to_key[future] + try: + temp_path = future.result() + results[video_key] = temp_path + except Exception as exc: + logging.error(f"Video encoding failed for {video_key}: {exc}") + raise exc + + for video_key in self.meta.video_keys: + temp_path = results[video_key] + ep_metadata.update( + self._save_episode_video(video_key, episode_index, temp_path=temp_path) + ) + else: + for video_key in self.meta.video_keys: + ep_metadata.update(self._save_episode_video(video_key, episode_index)) # `meta.save_episode` need to be executed after encoding the videos self.meta.save_episode(episode_index, episode_length, episode_tasks, ep_stats, ep_metadata) @@ -1345,9 +1395,18 @@ class LeRobotDataset(torch.utils.data.Dataset): return metadata - def _save_episode_video(self, video_key: str, episode_index: int) -> dict: + def _save_episode_video( + self, + video_key: str, + episode_index: int, + temp_path: Path | None = None, + ) -> dict: # Encode episode frames into a temporary video - ep_path = self._encode_temporary_episode_video(video_key, episode_index) + if temp_path is None: + ep_path = self._encode_temporary_episode_video(video_key, episode_index) + else: + ep_path = temp_path + ep_size_in_mb = get_file_size_in_mb(ep_path) ep_duration_in_s = get_video_duration_in_s(ep_path) @@ -1465,11 +1524,7 @@ class LeRobotDataset(torch.utils.data.Dataset): Note: `encode_video_frames` is a blocking call. Making it asynchronous shouldn't speedup encoding, since video encoding with ffmpeg is already using multithreading. """ - temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{video_key}_{episode_index:03d}.mp4" - img_dir = self._get_image_file_dir(episode_index, video_key) - encode_video_frames(img_dir, temp_path, self.fps, overwrite=True) - shutil.rmtree(img_dir) - return temp_path + return _encode_video_worker(video_key, episode_index, self.root, self.fps) @classmethod def create( diff --git a/src/lerobot/datasets/utils.py b/src/lerobot/datasets/utils.py index ce4cf1da1..234736a75 100644 --- a/src/lerobot/datasets/utils.py +++ b/src/lerobot/datasets/utils.py @@ -49,7 +49,7 @@ from lerobot.utils.utils import SuppressProgressBars, is_valid_numpy_dtype_strin DEFAULT_CHUNK_SIZE = 1000 # Max number of files per chunk DEFAULT_DATA_FILE_SIZE_IN_MB = 100 # Max size per file -DEFAULT_VIDEO_FILE_SIZE_IN_MB = 500 # Max size per file +DEFAULT_VIDEO_FILE_SIZE_IN_MB = 200 # Max size per file INFO_PATH = "meta/info.json" STATS_PATH = "meta/stats.json" diff --git a/src/lerobot/datasets/video_utils.py b/src/lerobot/datasets/video_utils.py index 0de791919..84ce13772 100644 --- a/src/lerobot/datasets/video_utils.py +++ b/src/lerobot/datasets/video_utils.py @@ -311,6 +311,7 @@ def encode_video_frames( fast_decode: int = 0, log_level: int | None = av.logging.ERROR, overwrite: bool = False, + preset: int | None = None, ) -> None: """More info on ffmpeg arguments tuning on `benchmark/video/README.md`""" # Check encoder availability @@ -359,6 +360,9 @@ def encode_video_frames( value = f"fast-decode={fast_decode}" if vcodec == "libsvtav1" else "fastdecode" video_options[key] = value + if vcodec == "libsvtav1": + video_options["preset"] = str(preset) if preset is not None else "12" + # Set logging level if log_level is not None: # "While less efficient, it is generally preferable to modify logging with Python's logging"