Files
lerobot-clone/src/lerobot/processor/tokenizer_processor.py
Steven Palma e881fb6678 refactor(pipeline): feature contract now categorizes between OBS or Action (#1867)
* refactor(processor): signature of transform_features

* refactor(processor): remove prefixes + processor respect new transform_features signature + update test accordingly

* refactor(processor): rename now is only for visual

* refactor(processor): update normalize processor

* refactor(processor): update vanilla processor features

* refactor(processor): feature contract now uses its own enum

* chore(processor): rename renameprocessor

* chore(processor): minor changes

* refactor(processor): add create & change aggregate

* refactor(processor): update aggregate

* refactor(processor): simplify to functions, fix features contracts and rename function

* test(processor): remove to converter tests as now they are very simple

* chore(docs): recover docs joint observations processor

* fix(processor): update RKP

* fix(tests): recv diff test_pipeline

* chore(tests): add docs to test

* chore(processor): leave obs language constant untouched

* fix(processor): correct new shape of feature in crop image processor
2025-09-09 18:27:30 +02:00

274 lines
10 KiB
Python

#!/usr/bin/env python
# Copyright 2025 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This script defines a processor for tokenizing natural language instructions from an environment transition.
It uses a tokenizer from the Hugging Face `transformers` library to convert task descriptions (text) into
token IDs and attention masks, which are then added to the observation dictionary.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import torch
from lerobot.configs.types import FeatureType, PipelineFeatureType, PolicyFeature
from lerobot.constants import OBS_LANGUAGE_ATTENTION_MASK, OBS_LANGUAGE_TOKENS
from lerobot.utils.import_utils import _transformers_available
from .core import EnvTransition, TransitionKey
from .pipeline import ObservationProcessorStep, ProcessorStepRegistry
# Conditional import for type checking and lazy loading
if TYPE_CHECKING or _transformers_available:
from transformers import AutoTokenizer
else:
AutoTokenizer = None
@dataclass
@ProcessorStepRegistry.register(name="tokenizer_processor")
class TokenizerProcessorStep(ObservationProcessorStep):
"""
Processor step to tokenize a natural language task description.
This step extracts a task string from the `complementary_data` of an `EnvTransition`,
tokenizes it using a Hugging Face `transformers` tokenizer, and adds the resulting
token IDs and attention mask to the `observation` dictionary.
Requires the `transformers` library to be installed.
Attributes:
tokenizer_name: The name of a pretrained tokenizer from the Hugging Face Hub (e.g., "bert-base-uncased").
tokenizer: A pre-initialized tokenizer object. If provided, `tokenizer_name` is ignored.
max_length: The maximum length to pad or truncate sequences to.
task_key: The key in `complementary_data` where the task string is stored.
padding_side: The side to pad on ('left' or 'right').
padding: The padding strategy ('max_length', 'longest', etc.).
truncation: Whether to truncate sequences longer than `max_length`.
input_tokenizer: The internal tokenizer instance, loaded during initialization.
"""
tokenizer_name: str | None = None
tokenizer: Any | None = None # Use `Any` for compatibility without a hard dependency
max_length: int = 512
task_key: str = "task"
padding_side: str = "right"
padding: str = "max_length"
truncation: bool = True
# Internal tokenizer instance (not part of the config)
input_tokenizer: Any = field(default=None, init=False, repr=False)
def __post_init__(self):
"""
Initializes the tokenizer after the dataclass is created.
It checks for the availability of the `transformers` library and loads the tokenizer
either from a provided object or by name from the Hugging Face Hub.
Raises:
ImportError: If the `transformers` library is not installed.
ValueError: If neither `tokenizer` nor `tokenizer_name` is provided.
"""
if not _transformers_available:
raise ImportError(
"The 'transformers' library is not installed. "
"Please install it with `pip install 'lerobot[transformers-dep]'` to use TokenizerProcessorStep."
)
if self.tokenizer is not None:
# Use provided tokenizer object directly
self.input_tokenizer = self.tokenizer
elif self.tokenizer_name is not None:
if AutoTokenizer is None:
raise ImportError("AutoTokenizer is not available")
self.input_tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_name)
else:
raise ValueError(
"Either 'tokenizer' or 'tokenizer_name' must be provided. "
"Pass a tokenizer object directly or a tokenizer name to auto-load."
)
def get_task(self, transition: EnvTransition) -> list[str] | None:
"""
Extracts the task description(s) from the transition's complementary data.
Args:
transition: The environment transition.
Returns:
A list of task strings, or None if the task key is not found or the value is None.
"""
complementary_data = transition.get(TransitionKey.COMPLEMENTARY_DATA)
if complementary_data is None:
return None
if self.task_key not in complementary_data:
return None
task = complementary_data[self.task_key]
if task is None:
return None
# Standardize to a list of strings for the tokenizer
if isinstance(task, str):
return [task]
elif isinstance(task, list) and all(isinstance(t, str) for t in task):
return task
return None
def observation(self, observation: dict[str, Any]) -> dict[str, Any]:
"""
Tokenizes the task description and adds it to the observation dictionary.
This method retrieves the task, tokenizes it, moves the resulting tensors to the
same device as other data in the transition, and updates the observation.
Args:
observation: The original observation dictionary.
Returns:
The updated observation dictionary including token IDs and an attention mask.
"""
task = self.get_task(self.transition)
if task is None:
return observation
# Tokenize the task (this will create CPU tensors)
tokenized_prompt = self._tokenize_text(task)
# Detect the device from existing tensors in the transition to ensure consistency
target_device = self._detect_device(self.transition)
# Move new tokenized tensors to the detected device
if target_device is not None:
tokenized_prompt = {
k: v.to(target_device) if isinstance(v, torch.Tensor) else v
for k, v in tokenized_prompt.items()
}
# Create a new observation dict to avoid modifying the original in place
new_observation = dict(observation)
# Add tokenized data to the observation
new_observation[OBS_LANGUAGE_TOKENS] = tokenized_prompt["input_ids"]
new_observation[OBS_LANGUAGE_ATTENTION_MASK] = tokenized_prompt["attention_mask"].to(dtype=torch.bool)
return new_observation
def _detect_device(self, transition: EnvTransition) -> torch.device | None:
"""
Detects the torch.device from existing tensors in the transition.
It checks tensors in the observation dictionary first, then the action tensor.
Args:
transition: The environment transition.
Returns:
The detected `torch.device`, or None if no tensors are found.
"""
# Check observation tensors first (most likely place to find tensors)
observation = transition.get(TransitionKey.OBSERVATION)
if observation:
for value in observation.values():
if isinstance(value, torch.Tensor):
return value.device
# Fallback to checking the action tensor
action = transition.get(TransitionKey.ACTION)
if isinstance(action, torch.Tensor):
return action.device
return None # No tensors found, default will be CPU
def _tokenize_text(self, text: str | list[str]) -> dict[str, torch.Tensor]:
"""
A wrapper around the tokenizer call.
Args:
text: A string or list of strings to tokenize.
Returns:
A dictionary containing tokenized 'input_ids' and 'attention_mask' as PyTorch tensors.
"""
return self.input_tokenizer(
text,
max_length=self.max_length,
truncation=self.truncation,
padding=self.padding,
padding_side=self.padding_side,
return_tensors="pt",
)
def get_config(self) -> dict[str, Any]:
"""
Returns the serializable configuration of the processor.
Note: The tokenizer object itself is not serialized. If the processor was initialized
with a tokenizer name, that name will be included in the config.
Returns:
A dictionary with the processor's configuration parameters.
"""
config = {
"max_length": self.max_length,
"task_key": self.task_key,
"padding_side": self.padding_side,
"padding": self.padding,
"truncation": self.truncation,
}
# Only save tokenizer_name if it was used to create the tokenizer
if self.tokenizer_name is not None and self.tokenizer is None:
config["tokenizer_name"] = self.tokenizer_name
return config
def transform_features(
self, features: dict[PipelineFeatureType, dict[str, PolicyFeature]]
) -> dict[PipelineFeatureType, dict[str, PolicyFeature]]:
"""
Adds feature definitions for the language tokens and attention mask.
This updates the policy features dictionary to include the new data added to the
observation, ensuring downstream components are aware of their shape and type.
Args:
features: The dictionary of existing policy features.
Returns:
The updated dictionary of policy features.
"""
# Add a feature for the token IDs if it doesn't already exist
if OBS_LANGUAGE_TOKENS not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_TOKENS] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
# Add a feature for the attention mask if it doesn't already exist
if OBS_LANGUAGE_ATTENTION_MASK not in features[PipelineFeatureType.OBSERVATION]:
features[PipelineFeatureType.OBSERVATION][OBS_LANGUAGE_ATTENTION_MASK] = PolicyFeature(
type=FeatureType.LANGUAGE, shape=(self.max_length,)
)
return features