diff --git a/src/lerobot/datasets/dataset_reader.py b/src/lerobot/datasets/dataset_reader.py index 85642034a..93e83f3cd 100644 --- a/src/lerobot/datasets/dataset_reader.py +++ b/src/lerobot/datasets/dataset_reader.py @@ -24,11 +24,11 @@ import torch from lerobot.datasets.audio_utils import decode_audio from lerobot.datasets.dataset_metadata import LeRobotDatasetMetadata from lerobot.datasets.feature_utils import ( - DEFAULT_AUDIO_CHUNK_DURATION, check_delta_timestamps, get_delta_indices, get_hf_features_from_features, ) +from lerobot.datasets.utils import DEFAULT_AUDIO_CHUNK_DURATION from lerobot.datasets.io_utils import ( hf_transform_to_torch, load_nested_dataset, diff --git a/src/lerobot/datasets/dataset_writer.py b/src/lerobot/datasets/dataset_writer.py index 2eb32665b..179052b85 100644 --- a/src/lerobot/datasets/dataset_writer.py +++ b/src/lerobot/datasets/dataset_writer.py @@ -150,7 +150,7 @@ class DatasetWriter: def _get_raw_audio_file_path(self, episode_index: int, audio_key: str) -> Path: fpath = DEFAULT_RAW_AUDIO_PATH.format(audio_key=audio_key, episode_index=episode_index) - return self.root / fpath + return self._root / fpath def _save_image( self, image: torch.Tensor | np.ndarray | PIL.Image.Image, fpath: Path, compress_level: int = 1 @@ -216,7 +216,7 @@ class DatasetWriter: compress_level = 1 if self._meta.features[key]["dtype"] == "video" else 6 self._save_image(frame[key], img_path, compress_level) self.episode_buffer[key].append(str(img_path)) - elif self.features[key]["dtype"] == "audio": + elif self._meta.features[key]["dtype"] == "audio": if ( self._meta.robot_type == "lekiwi" ): # Raw data storage should only be triggered for LeKiwi robot, for which audio is stored chunk by chunk in a visual frame-like manner @@ -237,7 +237,7 @@ class DatasetWriter: Starts recording audio data provided by the microphone and directly writes it in a .wav file. """ - audio_file = self._get_raw_audio_file_path(self.num_episodes, "observation.audio." + microphone_key) + audio_file = self._get_raw_audio_file_path(self._meta.total_episodes, "observation.audio." + microphone_key) microphone.start_recording(output_file=audio_file) def add_microphones_recordings(self, microphones: dict[str, Microphone]) -> None: @@ -248,7 +248,7 @@ class DatasetWriter: output_files = [] for microphone_key in microphones: output_files.append( - self._get_raw_audio_file_path(self.num_episodes, "observation.audio." + microphone_key) + self._get_raw_audio_file_path(self._meta.total_episodes, "observation.audio." + microphone_key) ) async_microphones_start_recording(microphones, output_files) @@ -357,7 +357,7 @@ class DatasetWriter: ep_metadata.update(self._save_episode_video(video_key, episode_index)) # TODO(Caroline): add parallel encoding for audio as well - for audio_key in self.meta.audio_keys: + for audio_key in self._meta.audio_keys: ep_metadata.update(self._save_episode_audio(audio_key, episode_index)) # `meta.save_episode` need to be executed after encoding the videos @@ -433,40 +433,40 @@ class DatasetWriter: end_episode: Ending episode index (exclusive). If None, encodes all episodes from start_episode to the current episode. """ if end_episode is None: - end_episode = self.num_episodes + end_episode = self._meta.total_episodes logging.info( f"Batch encoding {self.batch_encoding_size} audio for episodes {start_episode} to {end_episode - 1}" ) - chunk_idx = self.meta.episodes[start_episode]["data/chunk_index"] - file_idx = self.meta.episodes[start_episode]["data/file_index"] - episode_df_path = self.root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx) + chunk_idx = self._meta.episodes[start_episode]["data/chunk_index"] + file_idx = self._meta.episodes[start_episode]["data/file_index"] + episode_df_path = self._root / DEFAULT_EPISODES_PATH.format(chunk_index=chunk_idx, file_index=file_idx) episode_df = pd.read_parquet(episode_df_path) for ep_idx in range(start_episode, end_episode): logging.info(f"Encoding audio for episode {ep_idx}") if ( - self.meta.episodes[ep_idx]["data/chunk_index"] != chunk_idx - or self.meta.episodes[ep_idx]["data/file_index"] != file_idx + self._meta.episodes[ep_idx]["data/chunk_index"] != chunk_idx + or self._meta.episodes[ep_idx]["data/file_index"] != file_idx ): # The current episode is in a new chunk or file. # Save previous episode dataframe and update the Hugging Face dataset by reloading it. episode_df.to_parquet(episode_df_path) - self.meta.episodes = load_episodes(self.root) + self._meta.episodes = load_episodes(self._root) # Load new episode dataframe - chunk_idx = self.meta.episodes[ep_idx]["data/chunk_index"] - file_idx = self.meta.episodes[ep_idx]["data/file_index"] - episode_df_path = self.root / DEFAULT_EPISODES_PATH.format( + chunk_idx = self._meta.episodes[ep_idx]["data/chunk_index"] + file_idx = self._meta.episodes[ep_idx]["data/file_index"] + episode_df_path = self._root / DEFAULT_EPISODES_PATH.format( chunk_index=chunk_idx, file_index=file_idx ) episode_df = pd.read_parquet(episode_df_path) # Save the current episode's video metadata to the dataframe audio_ep_metadata = {} - for audio_key in self.meta.audio_keys: + for audio_key in self._meta.audio_keys: audio_ep_metadata.update(self._save_episode_audio(audio_key, ep_idx)) audio_ep_metadata.pop("episode_index") audio_ep_df = pd.DataFrame(audio_ep_metadata, index=[ep_idx]).convert_dtypes( @@ -475,7 +475,7 @@ class DatasetWriter: episode_df = episode_df.combine_first(audio_ep_df) episode_df.to_parquet(episode_df_path) - self.meta.episodes = load_episodes(self.root) + self._meta.episodes = load_episodes(self._root) def _save_episode_data(self, episode_buffer: dict) -> dict: """Save episode data to a parquet file.""" @@ -622,7 +622,7 @@ class DatasetWriter: Note: `encode_episode_audio` is a blocking call. Making it asynchronous shouldn't speedup encoding, since audio encoding with ffmpeg is already using multithreading. """ - temp_path = Path(tempfile.mkdtemp(dir=self.root)) / f"{audio_key}_{episode_index:03d}.m4a" + temp_path = Path(tempfile.mkdtemp(dir=self._root)) / f"{audio_key}_{episode_index:03d}.m4a" raw_audio_file = self._get_raw_audio_file_path(episode_index, audio_key) encode_audio(raw_audio_file, temp_path, overwrite=True) raw_audio_file.unlink() @@ -636,41 +636,41 @@ class DatasetWriter: if ( episode_index == 0 - or self.meta.latest_episode is None - or f"audio/{audio_key}/chunk_index" not in self.meta.latest_episode + or self._meta.latest_episode is None + or f"audio/{audio_key}/chunk_index" not in self._meta.latest_episode ): # Initialize indices for a new dataset made of the first episode data chunk_idx, file_idx = 0, 0 - if self.meta.episodes is not None and len(self.meta.episodes) > 0: + if self._meta.episodes is not None and len(self._meta.episodes) > 0: # It means we are resuming recording, so we need to load the latest episode # Update the indices to avoid overwriting the latest episode - old_chunk_idx = self.meta.episodes[-1][f"audio/{audio_key}/chunk_index"] - old_file_idx = self.meta.episodes[-1][f"audio/{audio_key}/file_index"] + old_chunk_idx = self._meta.episodes[-1][f"audio/{audio_key}/chunk_index"] + old_file_idx = self._meta.episodes[-1][f"audio/{audio_key}/file_index"] chunk_idx, file_idx = update_chunk_file_indices( - old_chunk_idx, old_file_idx, self.meta.chunks_size + old_chunk_idx, old_file_idx, self._meta.chunks_size ) latest_duration_in_s = 0.0 - new_path = self.root / self.meta.audio_path.format( + new_path = self._root / self._meta.audio_path.format( audio_key=audio_key, chunk_index=chunk_idx, file_index=file_idx ) new_path.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(ep_path), str(new_path)) else: # Retrieve information from the latest updated audio file using latest_episode - latest_ep = self.meta.latest_episode + latest_ep = self._meta.latest_episode chunk_idx = latest_ep[f"audio/{audio_key}/chunk_index"][0] file_idx = latest_ep[f"audio/{audio_key}/file_index"][0] - latest_path = self.root / self.meta.audio_path.format( + latest_path = self._root / self._meta.audio_path.format( audio_key=audio_key, chunk_index=chunk_idx, file_index=file_idx ) latest_size_in_mb = get_file_size_in_mb(latest_path) latest_duration_in_s = latest_ep[f"audio/{audio_key}/to_timestamp"][0] - if latest_size_in_mb + ep_size_in_mb >= self.meta.audio_files_size_in_mb: + if latest_size_in_mb + ep_size_in_mb >= self._meta.audio_files_size_in_mb: # Move temporary episode audio to a new audio file in the dataset - chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self.meta.chunks_size) - new_path = self.root / self.meta.audio_path.format( + chunk_idx, file_idx = update_chunk_file_indices(chunk_idx, file_idx, self._meta.chunks_size) + new_path = self._root / self._meta.audio_path.format( audio_key=audio_key, chunk_index=chunk_idx, file_index=file_idx ) new_path.parent.mkdir(parents=True, exist_ok=True) @@ -688,8 +688,8 @@ class DatasetWriter: # Update audio info (only needed when first episode is encoded since it reads from episode 0) if episode_index == 0: - self.meta.update_audio_info(audio_key) - write_info(self.meta.info, self.meta.root) # ensure audio info always written properly + self._meta.update_audio_info(audio_key) + write_info(self._meta.info, self._meta.root) # ensure audio info always written properly metadata = { "episode_index": episode_index, @@ -728,7 +728,7 @@ class DatasetWriter: episode_index = self.episode_buffer["episode_index"] if isinstance(episode_index, np.ndarray): episode_index = episode_index.item() if episode_index.size == 1 else episode_index[0] - for audio_key in self.meta.audio_keys: + for audio_key in self._meta.audio_keys: audio_file = self._get_raw_audio_file_path(episode_index, audio_key) if audio_file.is_file(): audio_file.unlink() diff --git a/src/lerobot/processor/batch_processor.py b/src/lerobot/processor/batch_processor.py index b8ee1a3c8..cd73cfb5e 100644 --- a/src/lerobot/processor/batch_processor.py +++ b/src/lerobot/processor/batch_processor.py @@ -35,6 +35,7 @@ from .pipeline import ( ProcessorStepRegistry, TransitionKey, ) +from lerobot.types import PolicyAction, EnvTransition @dataclass