mirror of
https://github.com/huggingface/lerobot.git
synced 2026-05-31 10:51:35 +00:00
* docs(benchmarks): add benchmark integration guide and standardize benchmark docs Add a comprehensive guide for adding new benchmarks to LeRobot, and refactor the existing LIBERO and Meta-World docs to follow the new standardized template. Made-with: Cursor * refactor(envs): move dispatch logic from factory into EnvConfig subclasses Replace hardcoded if/elif chains in factory.py with create_envs() and get_env_processors() methods on EnvConfig. New benchmarks now only need to register a config subclass — no factory.py edits required. Net -23 lines: factory.py shrinks from ~200 to ~70 lines of logic. Made-with: Cursor * docs(benchmarks): clean up adding-benchmarks guide for clarity Rewrite for simpler language, better structure, and easier navigation. Move quick-reference table to the top, fold eval explanation into architecture section, condense the doc template to a bulleted outline. Made-with: Cursor * fix link * fix task count * fix: enable SmolVLA eval on LIBERO with custom camera mappings - Thread camera_name_mapping from LiberoEnv config through to gym envs - Sync features_map with camera_name_mapping in LiberoEnv.__post_init__ - Fix render() to use first available camera instead of hardcoded "image" - Handle non-dict final_info in rollout by falling back to info["is_success"] - Add use_peft legacy field to SmolVLAConfig for checkpoint compat - Add defaults to GR00TN15Config init=False fields for transformers 5.3 Made-with: Cursor * fix: use direct AutoresetMode import for gymnasium compat Made-with: Cursor * fix: handle gymnasium < 1.0 without AutoresetMode Made-with: Cursor * refactor: revert policy changes, keep env-only camera mapping fixes - Revert GR00T N1.5 default_factory/default changes (transformers compat) - Revert SmolVLA use_peft legacy field - Apply ruff formatting fixes - camera_name_mapping stays entirely in env/eval layer (no policy changes) Made-with: Cursor * Update docs/source/env_processor.mdx Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co> Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com> * feat(envs): lazy env init + AsyncVectorEnv as default for n_envs > 1 LiberoEnv and MetaworldEnv previously allocated GPU resources (EGL context, OpenGL framebuffer) in __init__, before AsyncVectorEnv's fork(). Worker processes inherited stale GPU handles, causing EGL_BAD_CONTEXT crashes on first render. Fix: defer OffScreenRenderEnv / MT1 construction to _ensure_env(), called on first reset() or step() inside the worker subprocess. Each worker creates its own clean context after fork(). Also fixes lerobot_eval.py:170 (add_envs_task TODO): replace with env.call("task") which works with both SyncVectorEnv and AsyncVectorEnv. AsyncVectorEnv is now the default for n_envs > 1; auto-downgraded to SyncVectorEnv when n_envs=1 (no benefit, less overhead). Expected speedup: ~15-20x for LIBERO Spatial with batch_size=50. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: close envs between tasks to prevent worker process accumulation eval_policy_all never closed environments after each task completed, causing AsyncVectorEnv worker processes to accumulate (N_tasks × n_envs). This led to OOM, BrokenPipeError and EOFError on multi-task benchmarks. Also fixes: - AsyncVectorEnv compat in envs/utils.py (use get_attr/call instead of .envs) - Tuple task handling in tokenizer_processor and lerobot_eval - _LazyAsyncVectorEnv for deferred worker spawning in LIBERO Made-with: Cursor * fix(eval): use task_description instead of task for language conditioning env.call("task") returns the LIBERO task name with underscores (e.g. "pick_up_the_black_bowl_...") instead of the natural language description ("pick up the black bowl ..."). The VLM tokenizes these completely differently, causing 0.0 reward across all episodes. Made-with: Cursor * docs: update adding_benchmarks for async env changes - Replace add_envs_task reference with env.call("task_description") - Update use_async_envs default to True - Add note about lazy GPU init for AsyncVectorEnv compatibility Made-with: Cursor * feat(eval): batch_size=auto + faster env loading - batch_size=0 (default) auto-tunes based on CPU cores, capped by n_episodes and 64. Removes the need for users to guess the right value. The old batch_size > n_episodes error is replaced by silently clamping to n_episodes. - _LazyAsyncVectorEnv accepts pre-computed spaces so only one temp env is created per suite (not per task). For libero_spatial (10 tasks) this avoids 9 redundant LiberoEnv instantiations during env setup. Made-with: Cursor * docs: add evaluation guide and update benchmarks doc - New docs/source/evaluation.mdx covering lerobot-eval usage, batch_size auto-tuning, AsyncVectorEnv performance, tuning tips, output format, multi-task evaluation, and programmatic usage. - Add evaluation page to _toctree.yml under Benchmarks section. - Update adding_benchmarks.mdx to reference batch_size auto default and link to the evaluation guide. Made-with: Cursor * docs(evaluation): remove benchmark table, rename section header Made-with: Cursor * perf(eval): shared memory, observation passthrough, task prefetch - AsyncVectorEnv now uses shared_memory=True for zero-copy observation transfer - LiberoEnvConfig.gym_kwargs passes observation_height/width to the env - eval_policy_all prefetches next task's workers while current task runs Made-with: Cursor * style: ruff format Made-with: Cursor * chore: revert env_processor.mdx changes (not part of this PR) Made-with: Cursor * ci(benchmarks): add isolated integration tests for libero and metaworld Each benchmark gets its own Docker image (lerobot[libero] / lerobot[metaworld] only) so incompatible dep trees cannot collide. A 1-episode smoke eval runs per benchmark on GPU runners. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * ci(benchmarks): pin action hashes and use uv sync --locked Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * ci(benchmarks): trigger only on envs/ or lerobot_eval.py changes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): set LIBERO_DATA_FOLDER to bypass interactive stdin prompt libero/__init__.py calls input() to ask about a custom dataset path, which raises EOFError when stdin is closed inside Docker. Setting LIBERO_DATA_FOLDER skips the prompt entirely. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs(benchmarks): add CI smoke test step to adding_benchmarks guide Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): pre-create libero config in Dockerfile to bypass stdin prompt libero/__init__.py calls input() when ~/.libero/config.yaml is missing. We write the config at image build time (without importing libero) so the prompt never fires at runtime. Also trigger CI on pyproject.toml changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): use shell to create libero config instead of multiline python -c The multiline RUN python -c "..." was being parsed as Dockerfile instructions. Use printf to write ~/.libero/config.yaml directly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): point libero config to bundled package init_files The config was pointing to /tmp/libero_init which doesn't exist. Use importlib.util.find_spec to locate the hf-libero package directory and write paths to the actual bundled bddl_files/init_files/assets. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(ci): add smolvla extra to benchmark Dockerfiles num2words (required by SmolVLM processor) is declared in lerobot[smolvla], not lerobot[libero/metaworld]. Install both extras together. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(eval): render_frame covers _LazyAsyncVectorEnv isinstance(env, AsyncVectorEnv) silently skipped _LazyAsyncVectorEnv, causing video rendering to produce no frames on the default async path. Switch to hasattr(env, "call") so any async-compatible env (including _LazyAsyncVectorEnv) hits the call("render") branch. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(envs): remove unused _get_sub_env_attr helper _get_sub_env_attr was defined but never called anywhere in the codebase. _sub_env_has_attr (its sibling) is kept — it is actively used in utils.py. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore: apply prettier formatting to docs Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs(env_processor): remove deprecated add_envs_task from pipeline example add_envs_task is replaced by env.call("task_description") in this PR. Remove it from the pipeline walkthrough and renumber the steps (8→7). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(envs): remove __del__ from _LazyAsyncVectorEnv __del__ is unreliable as a cleanup mechanism. close() is already called explicitly in the eval loop's finally block, so the finalizer is redundant. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(eval): prefetch next task's workers after close to avoid GPU memory overlap Previously, next task's AsyncVectorEnv workers were spawned while the current task was still running, causing both tasks' GPU contexts to coexist. Moving the prefetch start into the finally block (after env.close()) ensures workers for task N+1 only spin up once task N has released GPU memory. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(envs): move _LazyAsyncVectorEnv to utils and apply to metaworld _LazyAsyncVectorEnv lived in libero.py but metaworld had the same OOM problem: all tasks' AsyncVectorEnv workers were spawned eagerly, wasting GPU memory for tasks not yet running. Move the class to envs/utils.py so both environments share it, then apply the same is_async + lazy wrapping pattern in create_metaworld_envs. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore: remove out-of-scope benchmark/CI/docs files from PR Benchmark CI workflow, Dockerfiles, benchmark docs, evaluation smoke-test doc, and dispatch tests belong in a separate PR. Scope this PR to the async env init changes only. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore: restore adding_benchmarks + test_dispatch, drop env_processor changes - Restore docs/source/adding_benchmarks.mdx (belongs in this PR) - Restore tests/envs/test_dispatch.py (belongs in this PR) - Revert docs/source/env_processor.mdx to main (out of scope for this PR) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * docs(adding_benchmarks): remove CI smoke test step (coming in separate PR) Step 7 (Dockerfile + benchmark_tests.yml CI job) and its table rows are out of scope for this PR. The CI infrastructure will be added on top in a follow-up PR. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(envs): remove unused add_envs_task Replaced by env.call("task_description") in lerobot_eval.py. No callers remain in the codebase. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style: fix prettier formatting in env_processor.mdx Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(eval): catch AttributeError and NotImplementedError explicitly for task description Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(envs): use forkserver context and close envs in test to prevent deadlock AsyncVectorEnv with default fork context leaks worker processes between test_policy parametrized cases; subsequent env creation deadlocks because new forked workers inherit stale pipe FDs from previous test's leaked workers. - configs.py: pass context="forkserver" to AsyncVectorEnv (matches _LazyAsyncVectorEnv) - test_policies.py: call close_envs(envs) at end of test_policy to clean up workers Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(envs): default use_async_envs=False in create_envs and make_env Tests that call make_env(n_envs=2) without passing use_async_envs were getting AsyncVectorEnv, whose forked workers can't resolve gym namespaces registered at runtime. Default to False (sync) so existing tests pass. lerobot_eval.py explicitly passes cfg.eval.use_async_envs, so the CLI async behaviour (controlled by EvalConfig.use_async_envs) is unchanged. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Signed-off-by: Pepijn <138571049+pkooij@users.noreply.github.com> Co-authored-by: Khalil Meftah <khalil.meftah@huggingface.co> Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1529 lines
58 KiB
Python
1529 lines
58 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Tests for the TokenizerProcessorStep class.
|
|
"""
|
|
|
|
import tempfile
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
import torch
|
|
|
|
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
|
|
from lerobot.processor import DataProcessorPipeline, TokenizerProcessorStep
|
|
from lerobot.processor.converters import create_transition, identity_transition
|
|
from lerobot.types import TransitionKey
|
|
from lerobot.utils.constants import (
|
|
ACTION,
|
|
OBS_IMAGE,
|
|
OBS_LANGUAGE,
|
|
OBS_LANGUAGE_SUBTASK_ATTENTION_MASK,
|
|
OBS_LANGUAGE_SUBTASK_TOKENS,
|
|
OBS_STATE,
|
|
)
|
|
from tests.utils import require_package
|
|
|
|
|
|
class MockTokenizer:
|
|
"""Mock tokenizer for testing that mimics transformers tokenizer interface."""
|
|
|
|
def __init__(self, vocab_size: int = 1000):
|
|
self.vocab_size = vocab_size
|
|
|
|
def __call__(
|
|
self,
|
|
text: str | list[str],
|
|
max_length: int = 512,
|
|
truncation: bool = True,
|
|
padding: str = "max_length",
|
|
padding_side: str = "right",
|
|
return_tensors: str = "pt",
|
|
**kwargs,
|
|
) -> dict[str, torch.Tensor]:
|
|
"""Mock tokenization that returns deterministic tokens based on text."""
|
|
texts = [text] if isinstance(text, str) else text
|
|
|
|
batch_size = len(texts)
|
|
|
|
# Create mock input_ids and attention_mask
|
|
input_ids = torch.zeros(batch_size, max_length, dtype=torch.long)
|
|
attention_mask = torch.zeros(batch_size, max_length, dtype=torch.long)
|
|
|
|
for i, txt in enumerate(texts):
|
|
# Simple mock: use hash of text to generate deterministic tokens
|
|
text_hash = hash(txt) % self.vocab_size
|
|
seq_len = min(len(txt.split()), max_length)
|
|
|
|
# Fill input_ids with simple pattern based on text
|
|
for j in range(seq_len):
|
|
input_ids[i, j] = (text_hash + j) % self.vocab_size
|
|
|
|
# Set attention mask for non-padded positions
|
|
attention_mask[i, :seq_len] = 1
|
|
|
|
result = {
|
|
"input_ids": input_ids,
|
|
"attention_mask": attention_mask,
|
|
}
|
|
|
|
# Return single sequence for single input to match transformers behavior
|
|
if len(texts) == 1:
|
|
result = {k: v.squeeze(0) for k, v in result.items()}
|
|
|
|
return result
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_tokenizer():
|
|
"""Provide a mock tokenizer for testing."""
|
|
return MockTokenizer(vocab_size=100)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_basic_tokenization(mock_auto_tokenizer):
|
|
"""Test basic string tokenization functionality."""
|
|
# Mock AutoTokenizer.from_pretrained to return our mock tokenizer
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "pick up the red cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that original task is preserved
|
|
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "pick up the red cube"
|
|
|
|
# Check that tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
# Check token structure
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert isinstance(tokens, torch.Tensor)
|
|
assert isinstance(attention_mask, torch.Tensor)
|
|
assert tokens.shape == (10,)
|
|
assert attention_mask.shape == (10,)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_basic_tokenization_with_tokenizer_object():
|
|
"""Test basic string tokenization functionality using tokenizer object directly."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "pick up the red cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that original task is preserved
|
|
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == "pick up the red cube"
|
|
|
|
# Check that tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
# Check token structure
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert isinstance(tokens, torch.Tensor)
|
|
assert isinstance(attention_mask, torch.Tensor)
|
|
assert tokens.shape == (10,)
|
|
assert attention_mask.shape == (10,)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_list_of_strings_tokenization(mock_auto_tokenizer):
|
|
"""Test tokenization of a list of strings."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": ["pick up cube", "place on table"]},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that original task is preserved
|
|
assert result[TransitionKey.COMPLEMENTARY_DATA]["task"] == ["pick up cube", "place on table"]
|
|
|
|
# Check that tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert tokens.shape == (2, 8) # batch_size=2, seq_len=8
|
|
assert attention_mask.shape == (2, 8)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_tuple_of_strings_tokenization(mock_auto_tokenizer):
|
|
"""Test tokenization of a tuple of strings (returned by VectorEnv.call())."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": ("pick up cube", "place on table")},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert tokens.shape == (2, 8)
|
|
assert attention_mask.shape == (2, 8)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_custom_keys(mock_auto_tokenizer):
|
|
"""Test using custom task_key."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", task_key="instruction", max_length=5)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"instruction": "move forward"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokens are stored in observation regardless of task_key
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
assert tokens.shape == (5,)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_none_complementary_data(mock_auto_tokenizer):
|
|
"""Test handling of None complementary_data."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
transition = create_transition(observation={}, complementary_data=None)
|
|
|
|
# create_transition converts None complementary_data to empty dict, so task key is missing
|
|
with pytest.raises(KeyError, match="task"):
|
|
processor(transition)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_missing_task_key(mock_auto_tokenizer):
|
|
"""Test handling when task key is missing."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
transition = create_transition(observation={}, complementary_data={"other_field": "some value"})
|
|
|
|
with pytest.raises(KeyError, match="task"):
|
|
processor(transition)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_none_task_value(mock_auto_tokenizer):
|
|
"""Test handling when task value is None."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
transition = create_transition(observation={}, complementary_data={"task": None})
|
|
|
|
with pytest.raises(ValueError, match="Task extracted from Complementary data is None"):
|
|
processor(transition)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_unsupported_task_type(mock_auto_tokenizer):
|
|
"""Test handling of unsupported task types."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
# Test with integer task - get_task returns None, observation raises ValueError
|
|
transition = create_transition(observation={}, complementary_data={"task": 123})
|
|
|
|
with pytest.raises(ValueError, match="Task cannot be None"):
|
|
processor(transition)
|
|
|
|
# Test with mixed list - get_task returns None, observation raises ValueError
|
|
transition = create_transition(observation={}, complementary_data={"task": ["text", 123, "more text"]})
|
|
|
|
with pytest.raises(ValueError, match="Task cannot be None"):
|
|
processor(transition)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_no_tokenizer_error():
|
|
"""Test that ValueError is raised when neither tokenizer nor tokenizer_name is provided."""
|
|
with pytest.raises(ValueError, match="Either 'tokenizer' or 'tokenizer_name' must be provided"):
|
|
TokenizerProcessorStep()
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_invalid_tokenizer_name_error():
|
|
"""Test that error is raised when invalid tokenizer_name is provided."""
|
|
with patch("lerobot.processor.tokenizer_processor.AutoTokenizer") as mock_auto_tokenizer:
|
|
# Mock import error
|
|
mock_auto_tokenizer.from_pretrained.side_effect = Exception("Model not found")
|
|
|
|
with pytest.raises(Exception, match="Model not found"):
|
|
TokenizerProcessorStep(tokenizer_name="invalid-tokenizer")
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_get_config_with_tokenizer_name(mock_auto_tokenizer):
|
|
"""Test configuration serialization when using tokenizer_name."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(
|
|
tokenizer_name="test-tokenizer",
|
|
max_length=256,
|
|
task_key="instruction",
|
|
padding="longest",
|
|
truncation=False,
|
|
)
|
|
|
|
config = processor.get_config()
|
|
|
|
expected = {
|
|
"tokenizer_name": "test-tokenizer",
|
|
"max_length": 256,
|
|
"task_key": "instruction",
|
|
"padding_side": "right",
|
|
"padding": "longest",
|
|
"truncation": False,
|
|
}
|
|
|
|
assert config == expected
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_config_with_tokenizer_object():
|
|
"""Test configuration serialization when using tokenizer object."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
|
|
processor = TokenizerProcessorStep(
|
|
tokenizer=mock_tokenizer,
|
|
max_length=256,
|
|
task_key="instruction",
|
|
padding="longest",
|
|
truncation=False,
|
|
)
|
|
|
|
config = processor.get_config()
|
|
|
|
# tokenizer_name should not be in config when tokenizer object is used
|
|
expected = {
|
|
"max_length": 256,
|
|
"task_key": "instruction",
|
|
"padding_side": "right",
|
|
"padding": "longest",
|
|
"truncation": False,
|
|
}
|
|
|
|
assert config == expected
|
|
assert "tokenizer_name" not in config
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_state_dict_methods(mock_auto_tokenizer):
|
|
"""Test state_dict and load_state_dict methods."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
# Should return empty dict
|
|
state = processor.state_dict()
|
|
assert state == {}
|
|
|
|
# load_state_dict should not raise error
|
|
processor.load_state_dict({})
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_reset_method(mock_auto_tokenizer):
|
|
"""Test reset method."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
# Should not raise error
|
|
processor.reset()
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_integration_with_robot_processor(mock_auto_tokenizer):
|
|
"""Test integration with RobotProcessor."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6)
|
|
robot_processor = DataProcessorPipeline(
|
|
[tokenizer_processor], to_transition=identity_transition, to_output=identity_transition
|
|
)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "test task"},
|
|
)
|
|
|
|
result = robot_processor(transition)
|
|
|
|
# Check that observation exists and tokenization was applied
|
|
assert TransitionKey.OBSERVATION in result
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert tokens.shape == (6,)
|
|
assert attention_mask.shape == (6,)
|
|
|
|
# Check that other data is preserved
|
|
assert torch.equal(
|
|
result[TransitionKey.OBSERVATION]["state"], transition[TransitionKey.OBSERVATION]["state"]
|
|
)
|
|
assert torch.equal(result[TransitionKey.ACTION], transition[TransitionKey.ACTION])
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_save_and_load_pretrained_with_tokenizer_name(mock_auto_tokenizer):
|
|
"""Test saving and loading processor with tokenizer_name."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
original_processor = TokenizerProcessorStep(
|
|
tokenizer_name="test-tokenizer", max_length=32, task_key="instruction"
|
|
)
|
|
|
|
robot_processor = DataProcessorPipeline(
|
|
[original_processor], to_transition=identity_transition, to_output=identity_transition
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Save processor
|
|
robot_processor.save_pretrained(temp_dir)
|
|
|
|
# Load processor - tokenizer will be recreated from saved config
|
|
loaded_processor = DataProcessorPipeline.from_pretrained(
|
|
temp_dir,
|
|
config_filename="dataprocessorpipeline.json",
|
|
to_transition=identity_transition,
|
|
to_output=identity_transition,
|
|
)
|
|
|
|
# Test that loaded processor works
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"instruction": "test instruction"},
|
|
)
|
|
|
|
result = loaded_processor(transition)
|
|
assert TransitionKey.OBSERVATION in result
|
|
assert f"{OBS_LANGUAGE}.tokens" in result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in result[TransitionKey.OBSERVATION]
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_save_and_load_pretrained_with_tokenizer_object():
|
|
"""Test saving and loading processor with tokenizer object using overrides."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
|
|
original_processor = TokenizerProcessorStep(
|
|
tokenizer=mock_tokenizer, max_length=32, task_key="instruction"
|
|
)
|
|
|
|
robot_processor = DataProcessorPipeline(
|
|
[original_processor], to_transition=identity_transition, to_output=identity_transition
|
|
)
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Save processor
|
|
robot_processor.save_pretrained(temp_dir)
|
|
|
|
# Load processor with tokenizer override (since tokenizer object wasn't saved)
|
|
loaded_processor = DataProcessorPipeline.from_pretrained(
|
|
temp_dir,
|
|
config_filename="dataprocessorpipeline.json",
|
|
overrides={"tokenizer_processor": {"tokenizer": mock_tokenizer}},
|
|
to_transition=identity_transition,
|
|
to_output=identity_transition,
|
|
)
|
|
|
|
# Test that loaded processor works
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"instruction": "test instruction"},
|
|
)
|
|
|
|
result = loaded_processor(transition)
|
|
assert TransitionKey.OBSERVATION in result
|
|
assert f"{OBS_LANGUAGE}.tokens" in result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in result[TransitionKey.OBSERVATION]
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_registry_functionality():
|
|
"""Test that the processor is properly registered."""
|
|
from lerobot.processor import ProcessorStepRegistry
|
|
|
|
# Check that the processor is registered
|
|
assert "tokenizer_processor" in ProcessorStepRegistry.list()
|
|
|
|
# Check that we can retrieve it
|
|
retrieved_class = ProcessorStepRegistry.get("tokenizer_processor")
|
|
assert retrieved_class is TokenizerProcessorStep
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_features_basic():
|
|
"""Test basic feature contract functionality."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=128)
|
|
|
|
input_features = {
|
|
PipelineFeatureType.OBSERVATION: {OBS_STATE: PolicyFeature(type=FeatureType.STATE, shape=(10,))},
|
|
PipelineFeatureType.ACTION: {ACTION: PolicyFeature(type=FeatureType.ACTION, shape=(5,))},
|
|
}
|
|
|
|
output_features = processor.transform_features(input_features)
|
|
|
|
# Check that original features are preserved
|
|
assert OBS_STATE in output_features[PipelineFeatureType.OBSERVATION]
|
|
assert ACTION in output_features[PipelineFeatureType.ACTION]
|
|
|
|
# Check that tokenized features are added
|
|
assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION]
|
|
|
|
# Check feature properties
|
|
tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][
|
|
f"{OBS_LANGUAGE}.attention_mask"
|
|
]
|
|
|
|
assert tokens_feature.type == FeatureType.LANGUAGE
|
|
assert tokens_feature.shape == (128,)
|
|
assert attention_mask_feature.type == FeatureType.LANGUAGE
|
|
assert attention_mask_feature.shape == (128,)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_features_with_custom_max_length():
|
|
"""Test feature contract with custom max_length."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=64)
|
|
|
|
input_features = {PipelineFeatureType.OBSERVATION: {}}
|
|
output_features = processor.transform_features(input_features)
|
|
|
|
# Check that features use correct max_length
|
|
assert f"{OBS_LANGUAGE}.tokens" in output_features[PipelineFeatureType.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in output_features[PipelineFeatureType.OBSERVATION]
|
|
|
|
tokens_feature = output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask_feature = output_features[PipelineFeatureType.OBSERVATION][
|
|
f"{OBS_LANGUAGE}.attention_mask"
|
|
]
|
|
|
|
assert tokens_feature.shape == (64,)
|
|
assert attention_mask_feature.shape == (64,)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_features_existing_features():
|
|
"""Test feature contract when tokenized features already exist."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=256)
|
|
|
|
input_features = {
|
|
PipelineFeatureType.OBSERVATION: {
|
|
f"{OBS_LANGUAGE}.tokens": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)),
|
|
f"{OBS_LANGUAGE}.attention_mask": PolicyFeature(type=FeatureType.LANGUAGE, shape=(100,)),
|
|
}
|
|
}
|
|
|
|
output_features = processor.transform_features(input_features)
|
|
|
|
# Should not overwrite existing features
|
|
assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.tokens"].shape == (
|
|
100,
|
|
) # Original shape preserved
|
|
assert output_features[PipelineFeatureType.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"].shape == (100,)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_tokenization_parameters(mock_auto_tokenizer):
|
|
"""Test that tokenization parameters are correctly passed to tokenizer."""
|
|
|
|
# Create a custom mock that tracks calls
|
|
class TrackingMockTokenizer:
|
|
def __init__(self):
|
|
self.last_call_args = None
|
|
self.last_call_kwargs = None
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
self.last_call_args = args
|
|
self.last_call_kwargs = kwargs
|
|
# Return minimal valid output
|
|
return {
|
|
"input_ids": torch.zeros(16, dtype=torch.long),
|
|
"attention_mask": torch.ones(16, dtype=torch.long),
|
|
}
|
|
|
|
tracking_tokenizer = TrackingMockTokenizer()
|
|
mock_auto_tokenizer.from_pretrained.return_value = tracking_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(
|
|
tokenizer_name="test-tokenizer",
|
|
max_length=16,
|
|
padding="longest",
|
|
truncation=False,
|
|
padding_side="left",
|
|
)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "test task"},
|
|
)
|
|
|
|
processor(transition)
|
|
|
|
# Check that parameters were passed correctly (task is converted to list)
|
|
assert tracking_tokenizer.last_call_args == (["test task"],)
|
|
assert tracking_tokenizer.last_call_kwargs["max_length"] == 16
|
|
assert tracking_tokenizer.last_call_kwargs["padding"] == "longest"
|
|
assert tracking_tokenizer.last_call_kwargs["padding_side"] == "left"
|
|
assert tracking_tokenizer.last_call_kwargs["truncation"] is False
|
|
assert tracking_tokenizer.last_call_kwargs["return_tensors"] == "pt"
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_preserves_other_complementary_data(mock_auto_tokenizer):
|
|
"""Test that other complementary data fields are preserved."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer")
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={
|
|
"task": "test task",
|
|
"episode_id": 123,
|
|
"timestamp": 456.789,
|
|
"other_field": {"nested": "data"},
|
|
},
|
|
)
|
|
|
|
result = processor(transition)
|
|
comp_data = result[TransitionKey.COMPLEMENTARY_DATA]
|
|
|
|
# Check that all original fields are preserved
|
|
assert comp_data["task"] == "test task"
|
|
assert comp_data["episode_id"] == 123
|
|
assert comp_data["timestamp"] == 456.789
|
|
assert comp_data["other_field"] == {"nested": "data"}
|
|
|
|
# Check that tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_deterministic_tokenization(mock_auto_tokenizer):
|
|
"""Test that tokenization is deterministic for the same input."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "consistent test"},
|
|
)
|
|
|
|
result1 = processor(transition)
|
|
result2 = processor(transition)
|
|
|
|
tokens1 = result1[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask1 = result1[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
tokens2 = result2[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask2 = result2[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
# Results should be identical
|
|
assert torch.equal(tokens1, tokens2)
|
|
assert torch.equal(attention_mask1, attention_mask2)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_empty_string_task(mock_auto_tokenizer):
|
|
"""Test handling of empty string task."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": ""},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Should still tokenize (mock tokenizer handles empty strings)
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
assert tokens.shape == (8,)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_very_long_task(mock_auto_tokenizer):
|
|
"""Test handling of very long task strings."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=5, truncation=True)
|
|
|
|
long_task = " ".join(["word"] * 100) # Very long task
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": long_task},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Should be truncated to max_length
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
tokens = observation[f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = observation[f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert tokens.shape == (5,)
|
|
assert attention_mask.shape == (5,)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_custom_padding_side(mock_auto_tokenizer):
|
|
"""Test using custom padding_side parameter."""
|
|
|
|
# Create a mock tokenizer that tracks padding_side calls
|
|
class PaddingSideTrackingTokenizer:
|
|
def __init__(self):
|
|
self.padding_side_calls = []
|
|
|
|
def __call__(
|
|
self,
|
|
text,
|
|
max_length=512,
|
|
truncation=True,
|
|
padding="max_length",
|
|
padding_side="right",
|
|
return_tensors="pt",
|
|
**kwargs,
|
|
):
|
|
self.padding_side_calls.append(padding_side)
|
|
# Return minimal valid output
|
|
return {
|
|
"input_ids": torch.zeros(max_length, dtype=torch.long),
|
|
"attention_mask": torch.ones(max_length, dtype=torch.long),
|
|
}
|
|
|
|
tracking_tokenizer = PaddingSideTrackingTokenizer()
|
|
mock_auto_tokenizer.from_pretrained.return_value = tracking_tokenizer
|
|
|
|
# Test left padding
|
|
processor_left = TokenizerProcessorStep(
|
|
tokenizer_name="test-tokenizer", max_length=10, padding_side="left"
|
|
)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "test task"},
|
|
)
|
|
processor_left(transition)
|
|
|
|
assert tracking_tokenizer.padding_side_calls[-1] == "left"
|
|
|
|
# Test right padding (default)
|
|
processor_right = TokenizerProcessorStep(
|
|
tokenizer_name="test-tokenizer", max_length=10, padding_side="right"
|
|
)
|
|
|
|
processor_right(transition)
|
|
|
|
assert tracking_tokenizer.padding_side_calls[-1] == "right"
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_device_detection_cpu():
|
|
"""Test that tokenized tensors stay on CPU when other tensors are on CPU."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with CPU tensors
|
|
observation = {OBS_STATE: torch.randn(10)} # CPU tensor
|
|
action = torch.randn(5) # CPU tensor
|
|
transition = create_transition(
|
|
observation=observation, action=action, complementary_data={"task": "test task"}
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors are on CPU
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device.type == "cpu"
|
|
assert attention_mask.device.type == "cpu"
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
@require_package("transformers")
|
|
def test_device_detection_cuda():
|
|
"""Test that tokenized tensors are moved to CUDA when other tensors are on CUDA."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with CUDA tensors
|
|
observation = {OBS_STATE: torch.randn(10).cuda()} # CUDA tensor
|
|
action = torch.randn(5).cuda() # CUDA tensor
|
|
transition = create_transition(
|
|
observation=observation, action=action, complementary_data={"task": "test task"}
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors are on CUDA
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device.type == "cuda"
|
|
assert attention_mask.device.type == "cuda"
|
|
assert tokens.device.index == 0 # Should be on same device as input
|
|
|
|
|
|
@pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Requires at least 2 GPUs")
|
|
@require_package("transformers")
|
|
def test_device_detection_multi_gpu():
|
|
"""Test that tokenized tensors match device in multi-GPU setup."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Test with tensors on cuda:1
|
|
device = torch.device("cuda:1")
|
|
observation = {OBS_STATE: torch.randn(10).to(device)}
|
|
action = torch.randn(5).to(device)
|
|
transition = create_transition(
|
|
observation=observation, action=action, complementary_data={"task": "multi gpu test"}
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors are on cuda:1
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device == device
|
|
assert attention_mask.device == device
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_device_detection_no_tensors():
|
|
"""Test that tokenized tensors stay on CPU when no other tensors exist."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with no tensors
|
|
transition = create_transition(
|
|
observation={"metadata": {"key": "value"}}, # No tensors
|
|
complementary_data={"task": "no tensor test"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors are on CPU (default)
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device.type == "cpu"
|
|
assert attention_mask.device.type == "cpu"
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_device_detection_mixed_devices():
|
|
"""Test device detection when tensors are on different devices (uses first found)."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
if torch.cuda.is_available():
|
|
# Create transition with mixed devices
|
|
observation = {
|
|
"observation.cpu": torch.randn(10), # CPU
|
|
"observation.cuda": torch.randn(10).cuda(), # CUDA
|
|
}
|
|
transition = create_transition(
|
|
observation=observation, complementary_data={"task": "mixed device test"}
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# The device detection should use the first tensor found
|
|
# (iteration order depends on dict, but result should be consistent)
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
# Both should be on the same device
|
|
assert tokens.device == attention_mask.device
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
@require_package("transformers")
|
|
def test_device_detection_from_action():
|
|
"""Test that device is detected from action tensor when no observation tensors exist."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with action on CUDA but no observation tensors
|
|
observation = {"metadata": {"key": "value"}} # No tensors in observation
|
|
action = torch.randn(5).cuda()
|
|
transition = create_transition(
|
|
observation=observation, action=action, complementary_data={"task": "action device test"}
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors match action's device
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device.type == "cuda"
|
|
assert attention_mask.device.type == "cuda"
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_device_detection_preserves_dtype():
|
|
"""Test that device detection doesn't affect dtype of tokenized tensors."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with float tensor (to test dtype isn't affected)
|
|
observation = {OBS_STATE: torch.randn(10, dtype=torch.float16)}
|
|
transition = create_transition(observation=observation, complementary_data={"task": "dtype test"})
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that tokenized tensors have correct dtypes (not affected by input dtype)
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.dtype == torch.long # Should remain long
|
|
assert attention_mask.dtype == torch.bool # Should be bool (converted in processor)
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_integration_with_device_processor(mock_auto_tokenizer):
|
|
"""Test that TokenizerProcessorStep works correctly with DeviceProcessorStep in pipeline."""
|
|
from lerobot.processor import DeviceProcessorStep
|
|
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
# Create pipeline with TokenizerProcessorStep then DeviceProcessorStep
|
|
tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6)
|
|
device_processor = DeviceProcessorStep(device="cuda:0")
|
|
robot_processor = DataProcessorPipeline(
|
|
[tokenizer_processor, device_processor],
|
|
to_transition=identity_transition,
|
|
to_output=identity_transition,
|
|
)
|
|
|
|
# Start with CPU tensors
|
|
transition = create_transition(
|
|
observation={OBS_STATE: torch.randn(10)}, # CPU
|
|
action=torch.randn(5), # CPU
|
|
complementary_data={"task": "pipeline test"},
|
|
)
|
|
|
|
result = robot_processor(transition)
|
|
|
|
# All tensors should end up on CUDA (moved by DeviceProcessorStep)
|
|
assert result[TransitionKey.OBSERVATION][OBS_STATE].device.type == "cuda"
|
|
assert result[TransitionKey.ACTION].device.type == "cuda"
|
|
|
|
# Tokenized tensors should also be on CUDA
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
assert tokens.device.type == "cuda"
|
|
assert attention_mask.device.type == "cuda"
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
@require_package("transformers")
|
|
def test_simulated_accelerate_scenario():
|
|
"""Test scenario simulating Accelerate with data already on GPU."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Simulate Accelerate scenario: batch already on GPU
|
|
device = torch.device("cuda:0")
|
|
observation = {
|
|
OBS_STATE: torch.randn(1, 10).to(device), # Batched, on GPU
|
|
OBS_IMAGE: torch.randn(1, 3, 224, 224).to(device), # Batched, on GPU
|
|
}
|
|
action = torch.randn(1, 5).to(device) # Batched, on GPU
|
|
|
|
transition = create_transition(
|
|
observation=observation,
|
|
action=action,
|
|
complementary_data={"task": ["accelerate test"]}, # List for batched task
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Tokenized tensors should match GPU placement
|
|
tokens = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.tokens"]
|
|
attention_mask = result[TransitionKey.OBSERVATION][f"{OBS_LANGUAGE}.attention_mask"]
|
|
|
|
assert tokens.device == device
|
|
assert attention_mask.device == device
|
|
# MockTokenizer squeezes single-item batches, so shape is (max_length,) not (1, max_length)
|
|
assert tokens.shape == (10,) # MockTokenizer behavior for single string in list
|
|
assert attention_mask.shape == (10,)
|
|
|
|
|
|
# =============================================================================
|
|
# Tests for get_subtask method
|
|
# =============================================================================
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_missing_key():
|
|
"""Test get_subtask returns None when subtask key is missing from complementary_data."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task"}, # No "subtask" key
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_none_value():
|
|
"""Test get_subtask returns None when subtask value is None."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": None},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_none_complementary_data():
|
|
"""Test get_subtask returns None when complementary_data is None."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data=None, # No complementary data
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_string():
|
|
"""Test get_subtask returns list with single string when subtask is a string."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "pick up the cube"},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result == ["pick up the cube"]
|
|
assert isinstance(result, list)
|
|
assert len(result) == 1
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_list_of_strings():
|
|
"""Test get_subtask returns the list when subtask is already a list of strings."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
subtask_list = ["pick up", "move to target", "place down"]
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": subtask_list},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result == subtask_list
|
|
assert isinstance(result, list)
|
|
assert len(result) == 3
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_unsupported_type_integer():
|
|
"""Test get_subtask returns None when subtask is an unsupported type (integer)."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": 123},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_unsupported_type_mixed_list():
|
|
"""Test get_subtask returns None when subtask is a list with mixed types."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": ["valid string", 123, "another string"]},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_unsupported_type_dict():
|
|
"""Test get_subtask returns None when subtask is a dictionary."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": {"key": "value"}},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result is None
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_empty_string():
|
|
"""Test get_subtask with empty string returns list with empty string."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": ""},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result == [""]
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_get_subtask_empty_list():
|
|
"""Test get_subtask with empty list returns empty list."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": []},
|
|
)
|
|
|
|
result = processor.get_subtask(transition)
|
|
assert result == []
|
|
|
|
|
|
# =============================================================================
|
|
# Tests for subtask tokenization in observation method
|
|
# =============================================================================
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_when_present():
|
|
"""Test that subtask is tokenized and added to observation when present."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "pick up the red cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation
|
|
|
|
# Check token structure
|
|
subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
assert isinstance(subtask_tokens, torch.Tensor)
|
|
assert isinstance(subtask_attention_mask, torch.Tensor)
|
|
assert subtask_tokens.shape == (8,)
|
|
assert subtask_attention_mask.shape == (8,)
|
|
assert subtask_attention_mask.dtype == torch.bool
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_not_added_when_none():
|
|
"""Test that subtask tokens are NOT added to observation when subtask is None."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task"}, # No subtask
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens were NOT added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation
|
|
|
|
# But main task tokens should still be present
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_not_added_when_subtask_value_is_none():
|
|
"""Test that subtask tokens are NOT added when subtask value is explicitly None."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": None},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens were NOT added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_list_of_strings():
|
|
"""Test subtask tokenization with list of strings."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": ["pick up", "place down"]},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens were added to observation
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation
|
|
|
|
# Check token structure for batch
|
|
subtask_tokens = observation[OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
assert subtask_tokens.shape == (2, 8) # batch_size=2, seq_len=8
|
|
assert subtask_attention_mask.shape == (2, 8)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_device_cpu():
|
|
"""Test that subtask tokens are on CPU when other tensors are on CPU."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with CPU tensors
|
|
observation = {OBS_STATE: torch.randn(10)} # CPU tensor
|
|
action = torch.randn(5) # CPU tensor
|
|
transition = create_transition(
|
|
observation=observation,
|
|
action=action,
|
|
complementary_data={"task": "main task", "subtask": "pick up cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens are on CPU
|
|
subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
|
|
assert subtask_tokens.device.type == "cpu"
|
|
assert subtask_attention_mask.device.type == "cpu"
|
|
|
|
|
|
@pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA not available")
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_device_cuda():
|
|
"""Test that subtask tokens are moved to CUDA when other tensors are on CUDA."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
# Create transition with CUDA tensors
|
|
observation = {OBS_STATE: torch.randn(10).cuda()} # CUDA tensor
|
|
action = torch.randn(5).cuda() # CUDA tensor
|
|
transition = create_transition(
|
|
observation=observation,
|
|
action=action,
|
|
complementary_data={"task": "main task", "subtask": "pick up cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
|
|
# Check that subtask tokens are on CUDA
|
|
subtask_tokens = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_attention_mask = result[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
|
|
assert subtask_tokens.device.type == "cuda"
|
|
assert subtask_attention_mask.device.type == "cuda"
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_preserves_other_observation_data():
|
|
"""Test that subtask tokenization preserves other observation data."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
original_state = torch.tensor([1.0, 2.0, 3.0])
|
|
transition = create_transition(
|
|
observation={"state": original_state.clone()},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "pick up cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
|
|
# Check that original observation data is preserved
|
|
assert torch.equal(observation["state"], original_state)
|
|
|
|
# Check that both task and subtask tokens are present
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_attention_mask_dtype():
|
|
"""Test that subtask attention mask has correct dtype (bool)."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "pick up cube"},
|
|
)
|
|
|
|
result = processor(transition)
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
|
|
subtask_attention_mask = observation[OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
assert subtask_attention_mask.dtype == torch.bool
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_tokenization_deterministic():
|
|
"""Test that subtask tokenization is deterministic for the same input."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=10)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "consistent subtask"},
|
|
)
|
|
|
|
result1 = processor(transition)
|
|
result2 = processor(transition)
|
|
|
|
subtask_tokens1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_tokens2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_TOKENS]
|
|
subtask_mask1 = result1[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
subtask_mask2 = result2[TransitionKey.OBSERVATION][OBS_LANGUAGE_SUBTASK_ATTENTION_MASK]
|
|
|
|
# Results should be identical
|
|
assert torch.equal(subtask_tokens1, subtask_tokens2)
|
|
assert torch.equal(subtask_mask1, subtask_mask2)
|
|
|
|
|
|
@require_package("transformers")
|
|
@patch("lerobot.processor.tokenizer_processor.AutoTokenizer")
|
|
def test_subtask_tokenization_integration_with_pipeline(mock_auto_tokenizer):
|
|
"""Test subtask tokenization works correctly with DataProcessorPipeline."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
mock_auto_tokenizer.from_pretrained.return_value = mock_tokenizer
|
|
|
|
tokenizer_processor = TokenizerProcessorStep(tokenizer_name="test-tokenizer", max_length=6)
|
|
robot_processor = DataProcessorPipeline(
|
|
[tokenizer_processor], to_transition=identity_transition, to_output=identity_transition
|
|
)
|
|
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": "subtask instruction"},
|
|
)
|
|
|
|
result = robot_processor(transition)
|
|
|
|
# Check that observation exists and both tokenizations were applied
|
|
assert TransitionKey.OBSERVATION in result
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
|
|
# Check task tokens
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|
|
assert f"{OBS_LANGUAGE}.attention_mask" in observation
|
|
|
|
# Check subtask tokens
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK in observation
|
|
|
|
# Check shapes
|
|
assert observation[f"{OBS_LANGUAGE}.tokens"].shape == (6,)
|
|
assert observation[OBS_LANGUAGE_SUBTASK_TOKENS].shape == (6,)
|
|
|
|
|
|
@require_package("transformers")
|
|
def test_subtask_not_added_for_unsupported_types():
|
|
"""Test that subtask tokens are not added when subtask has unsupported type."""
|
|
mock_tokenizer = MockTokenizer(vocab_size=100)
|
|
processor = TokenizerProcessorStep(tokenizer=mock_tokenizer, max_length=8)
|
|
|
|
# Test with integer subtask
|
|
transition = create_transition(
|
|
observation={"state": torch.tensor([1.0, 2.0])},
|
|
action=torch.tensor([0.1, 0.2]),
|
|
complementary_data={"task": "main task", "subtask": 123},
|
|
)
|
|
|
|
result = processor(transition)
|
|
observation = result[TransitionKey.OBSERVATION]
|
|
|
|
# Subtask tokens should NOT be added for unsupported types
|
|
assert OBS_LANGUAGE_SUBTASK_TOKENS not in observation
|
|
assert OBS_LANGUAGE_SUBTASK_ATTENTION_MASK not in observation
|
|
|
|
# But main task tokens should still be present
|
|
assert f"{OBS_LANGUAGE}.tokens" in observation
|