mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-31 19:01:28 +00:00
Compare commits
4 Commits
feat/custo
...
fix/conver
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8008cb357d | ||
|
|
ca5a4a7ae5 | ||
|
|
b5dcd70d2c | ||
|
|
f6b16f6d97 |
@@ -39,6 +39,7 @@ from lerobot.datasets.aggregate import aggregate_datasets
|
||||
from lerobot.datasets.compute_stats import aggregate_stats
|
||||
from lerobot.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata
|
||||
from lerobot.datasets.utils import (
|
||||
DATA_DIR,
|
||||
DEFAULT_CHUNK_SIZE,
|
||||
DEFAULT_DATA_FILE_SIZE_IN_MB,
|
||||
DEFAULT_DATA_PATH,
|
||||
@@ -962,28 +963,23 @@ def _copy_data_with_feature_changes(
|
||||
remove_features: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Copy data while adding or removing features."""
|
||||
if dataset.meta.episodes is None:
|
||||
dataset.meta.episodes = load_episodes(dataset.meta.root)
|
||||
data_dir = dataset.root / DATA_DIR
|
||||
parquet_files = sorted(data_dir.glob("*/*.parquet"))
|
||||
|
||||
# Map file paths to episode indices to extract chunk/file indices
|
||||
file_to_episodes: dict[Path, set[int]] = {}
|
||||
for ep_idx in range(dataset.meta.total_episodes):
|
||||
file_path = dataset.meta.get_data_file_path(ep_idx)
|
||||
if file_path not in file_to_episodes:
|
||||
file_to_episodes[file_path] = set()
|
||||
file_to_episodes[file_path].add(ep_idx)
|
||||
if not parquet_files:
|
||||
raise ValueError(f"No parquet files found in {data_dir}")
|
||||
|
||||
frame_idx = 0
|
||||
|
||||
for src_path in tqdm(sorted(file_to_episodes.keys()), desc="Processing data files"):
|
||||
df = pd.read_parquet(dataset.root / src_path).reset_index(drop=True)
|
||||
for src_path in tqdm(parquet_files, desc="Processing data files"):
|
||||
df = pd.read_parquet(src_path).reset_index(drop=True)
|
||||
|
||||
# Get chunk_idx and file_idx from the source file's first episode
|
||||
episodes_in_file = file_to_episodes[src_path]
|
||||
first_ep_idx = min(episodes_in_file)
|
||||
src_ep = dataset.meta.episodes[first_ep_idx]
|
||||
chunk_idx = src_ep["data/chunk_index"]
|
||||
file_idx = src_ep["data/file_index"]
|
||||
relative_path = src_path.relative_to(dataset.root)
|
||||
chunk_dir = relative_path.parts[1]
|
||||
file_name = relative_path.parts[2]
|
||||
|
||||
chunk_idx = int(chunk_dir.split("-")[1])
|
||||
file_idx = int(file_name.split("-")[1].split(".")[0])
|
||||
|
||||
if remove_features:
|
||||
df = df.drop(columns=remove_features, errors="ignore")
|
||||
@@ -1009,7 +1005,7 @@ def _copy_data_with_feature_changes(
|
||||
df[feature_name] = feature_slice
|
||||
frame_idx = end_idx
|
||||
|
||||
# Write using the preserved chunk_idx and file_idx from source
|
||||
# Write using the same chunk/file structure as source
|
||||
dst_path = new_meta.root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
@@ -50,9 +50,9 @@ from typing import Any
|
||||
|
||||
import jsonlines
|
||||
import pandas as pd
|
||||
import pyarrow as pa
|
||||
import pyarrow.parquet as pq
|
||||
import tqdm
|
||||
from datasets import Dataset, Features, Image
|
||||
from datasets import Dataset, concatenate_datasets
|
||||
from huggingface_hub import HfApi, snapshot_download
|
||||
from requests import HTTPError
|
||||
|
||||
@@ -68,6 +68,7 @@ from lerobot.datasets.utils import (
|
||||
LEGACY_EPISODES_STATS_PATH,
|
||||
LEGACY_TASKS_PATH,
|
||||
cast_stats_to_numpy,
|
||||
embed_images,
|
||||
flatten_dict,
|
||||
get_file_size_in_mb,
|
||||
get_parquet_file_size_in_mb,
|
||||
@@ -174,25 +175,33 @@ def convert_tasks(root, new_root):
|
||||
write_tasks(df_tasks, new_root)
|
||||
|
||||
|
||||
def concat_data_files(paths_to_cat, new_root, chunk_idx, file_idx, image_keys):
|
||||
# TODO(rcadene): to save RAM use Dataset.from_parquet(file) and concatenate_datasets
|
||||
dataframes = [pd.read_parquet(file) for file in paths_to_cat]
|
||||
# Concatenate all DataFrames along rows
|
||||
concatenated_df = pd.concat(dataframes, ignore_index=True)
|
||||
def concat_data_files(
|
||||
paths_to_cat: list[Path], new_root: Path, chunk_idx: int, file_idx: int, image_keys: list[str]
|
||||
):
|
||||
"""Concatenate multiple parquet data files into a single file.
|
||||
|
||||
Args:
|
||||
paths_to_cat: List of parquet file paths to concatenate
|
||||
new_root: Root directory for the new dataset
|
||||
chunk_idx: Chunk index for the output file
|
||||
file_idx: File index within the chunk
|
||||
image_keys: List of feature keys that contain images
|
||||
"""
|
||||
|
||||
datasets_list: list[Dataset] = [Dataset.from_parquet(str(file)) for file in paths_to_cat]
|
||||
concatenated_ds: Dataset = concatenate_datasets(datasets_list)
|
||||
|
||||
if len(image_keys) > 0:
|
||||
logging.debug(f"Embedding {len(image_keys)} image features for optimal training performance")
|
||||
concatenated_ds = embed_images(concatenated_ds)
|
||||
|
||||
path = new_root / DEFAULT_DATA_PATH.format(chunk_index=chunk_idx, file_index=file_idx)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if len(image_keys) > 0:
|
||||
schema = pa.Schema.from_pandas(concatenated_df)
|
||||
features = Features.from_arrow_schema(schema)
|
||||
for key in image_keys:
|
||||
features[key] = Image()
|
||||
schema = features.arrow_schema
|
||||
else:
|
||||
schema = None
|
||||
|
||||
concatenated_df.to_parquet(path, index=False, schema=schema)
|
||||
table = concatenated_ds.with_format("arrow")[:]
|
||||
writer = pq.ParquetWriter(path, schema=table.schema, compression="snappy", use_dictionary=True)
|
||||
writer.write_table(table)
|
||||
writer.close()
|
||||
|
||||
|
||||
def convert_data(root: Path, new_root: Path, data_file_size_in_mb: int):
|
||||
|
||||
Reference in New Issue
Block a user