Compare commits

...

4 Commits

Author SHA1 Message Date
Michel Aractingi
8008cb357d remove bad typing 2025-11-06 09:13:26 +01:00
Michel Aractingi
ca5a4a7ae5 add typing hints 2025-11-06 09:12:09 +01:00
Michel Aractingi
b5dcd70d2c add embed images in conversion to v3 script; add parquet writer in conversion script 2025-11-05 23:41:38 +01:00
Michel Aractingi
f6b16f6d97 fix(dataset_tools) Critical bug in modify features (#2342)
* fix bug in `_copy_data_with_feature_changes`

* Update src/lerobot/datasets/dataset_tools.py

Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>

* add missing import

---------

Signed-off-by: Michel Aractingi <michel.aractingi@huggingface.co>
Co-authored-by: Caroline Pascal <caroline8.pascal@gmail.com>
2025-11-04 15:56:41 +01:00
2 changed files with 40 additions and 35 deletions

View File

@@ -39,6 +39,7 @@ from lerobot.datasets.aggregate import aggregate_datasets
from lerobot.datasets.compute_stats import aggregate_stats
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
from lerobot.datasets.utils import (
DATA_DIR,
DEFAULT_CHUNK_SIZE,
DEFAULT_DATA_FILE_SIZE_IN_MB,
DEFAULT_DATA_PATH,
@@ -962,28 +963,23 @@ def _copy_data_with_feature_changes(
remove_features: list[str] | None = None,
) -> None:
"""Copy data while adding or removing features."""
if dataset.meta.episodes is None:
dataset.meta.episodes = load_episodes(dataset.meta.root)
data_dir = dataset.root / DATA_DIR
parquet_files = sorted(data_dir.glob("*/*.parquet"))
# Map file paths to episode indices to extract chunk/file indices
file_to_episodes: dict[Path, set[int]] = {}
for ep_idx in range(dataset.meta.total_episodes):
file_path = dataset.meta.get_data_file_path(ep_idx)
if file_path not in file_to_episodes:
file_to_episodes[file_path] = set()
file_to_episodes[file_path].add(ep_idx)
if not parquet_files:
raise ValueError(f"No parquet files found in {data_dir}")
frame_idx = 0
for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
for src_path in tqdm(parquet_files, desc="Processing data files"):
df = pd.read_parquet(src_path).reset_index(drop=True)
# Get chunk_idx and file_idx from the source file's first episode
episodes_in_file = file_to_episodes[src_path]
first_ep_idx = min(episodes_in_file)
src_ep = dataset.meta.episodes[first_ep_idx]
chunk_idx = src_ep["data/chunk_index"]
file_idx = src_ep["data/file_index"]
relative_path = src_path.relative_to(dataset.root)
chunk_dir = relative_path.parts[1]
file_name = relative_path.parts[2]
chunk_idx = int(chunk_dir.split("-")[1])
file_idx = int(file_name.split("-")[1].split(".")[0])
if remove_features:
df = df.drop(columns=remove_features, errors="ignore")
@@ -1009,7 +1005,7 @@ def _copy_data_with_feature_changes(
df[feature_name] = feature_slice
frame_idx = end_idx
# Write using the preserved chunk_idx and file_idx from source
# Write using the same chunk/file structure as source
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
dst_path.parent.mkdir(parents=True, exist_ok=True)

View File

@@ -50,9 +50,9 @@ from typing import Any
import jsonlines
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import tqdm
from datasets import Dataset, Features, Image
from datasets import Dataset, concatenate_datasets
from huggingface_hub import HfApi, snapshot_download
from requests import HTTPError
@@ -68,6 +68,7 @@ from lerobot.datasets.utils import (
LEGACY_EPISODES_STATS_PATH,
LEGACY_TASKS_PATH,
cast_stats_to_numpy,
embed_images,
flatten_dict,
get_file_size_in_mb,
get_parquet_file_size_in_mb,
@@ -174,25 +175,33 @@ def convert_tasks(root, new_root):
write_tasks(df_tasks, new_root)
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
# Concatenate all DataFrames along rows
concatenated_df = pd.concat(dataframes, ignore_index=True)
def concat_data_files(
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
):
"""Concatenate multiple parquet data files into a single file.
Args:
paths_to_cat: List of parquet file paths to concatenate
new_root: Root directory for the new dataset
chunk_idx: Chunk index for the output file
file_idx: File index within the chunk
image_keys: List of feature keys that contain images
"""
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
if len(image_keys) > 0:
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
concatenated_ds = embed_images(concatenated_ds)
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
path.parent.mkdir(parents=True, exist_ok=True)
if len(image_keys) > 0:
schema = pa.Schema.from_pandas(concatenated_df)
features = Features.from_arrow_schema(schema)
for key in image_keys:
features[key] = Image()
schema = features.arrow_schema
else:
schema = None
concatenated_df.to_parquet(path, index=False, schema=schema)
table = concatenated_ds.with_format("arrow")[:]
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
writer.write_table(table)
writer.close()
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):