diff --git a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py index 74be6bfa4..856df2866 100644 --- a/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py +++ b/src/lerobot/datasets/v30/convert_dataset_v21_to_v30.py @@ -50,9 +50,9 @@ from typing import Any import jsonlines import pandas as pd -import pyarrow as pa +import pyarrow.parquet as pq import tqdm -from datasets import Dataset, Features, Image +from datasets import Dataset, concatenate_datasets from huggingface_hub import HfApi, snapshot_download from requests import HTTPError @@ -68,6 +68,7 @@ from lerobot.datasets.utils import ( LEGACY_EPISODES_STATS_PATH, LEGACY_TASKS_PATH, cast_stats_to_numpy, + embed_images, flatten_dict, get_file_size_in_mb, get_parquet_file_size_in_mb, @@ -175,24 +176,34 @@ def convert_tasks(root, new_root): def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys): - # TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets - dataframes = [pd.read_parquet(file) for file in paths_to_cat] - # Concatenate all DataFrames along rows - concatenated_df = pd.concat(dataframes, ignore_index=True) + """Concatenate multiple parquet data files into a single file. + + This function uses HuggingFace datasets to properly handle image embedding, + ensuring the output has the same internal structure as datasets created through + live recording. This is critical for training performance. + + Args: + paths_to_cat: List of parquet file paths to concatenate + new_root: Root directory for the new dataset + chunk_idx: Chunk index for the output file + file_idx: File index within the chunk + image_keys: List of feature keys that contain images + """ + + datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat] # type: ignore[misc] + concatenated_ds: Dataset = concatenate_datasets(datasets_list) + + if len(image_keys) > 0: + logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance") + concatenated_ds = embed_images(concatenated_ds) path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx) path.parent.mkdir(parents=True, exist_ok=True) - if len(image_keys) > 0: - schema = pa.Schema.from_pandas(concatenated_df) - features = Features.from_arrow_schema(schema) - for key in image_keys: - features[key] = Image() - schema = features.arrow_schema - else: - schema = None - - concatenated_df.to_parquet(path, index=False, schema=schema) + table = concatenated_ds.with_format("arrow")[:] + writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True) + writer.write_table(table) + writer.close() def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):