stages.text.models.model#

Module Contents#

Classes#

ModelStage

Base class for Hugging Face model inference.

API#

class stages.text.models.model.ModelStage(
model_identifier: str,
cache_dir: str | None = None,
hf_token: str | None = None,
model_inference_batch_size: int = 256,
has_seq_order: bool = True,
padding_side: Literal[left, right] = 'right',
unpack_inference_batch: bool = False,
autocast: bool = True,
)#

Bases: nemo_curator.stages.base.ProcessingStage[nemo_curator.tasks.DocumentBatch, nemo_curator.tasks.DocumentBatch]

Base class for Hugging Face model inference.

Args: model_identifier: The identifier of the Hugging Face model. cache_dir: The Hugging Face cache directory. Defaults to None. hf_token: Hugging Face token for downloading the model, if needed. Defaults to None. model_inference_batch_size: The size of the batch for model inference. Defaults to 256. has_seq_order: Whether to sort the input data by the length of the input tokens. Sorting is encouraged to improve the performance of the inference model. Defaults to True. padding_side: The side to pad the input tokens. Defaults to “right”. unpack_inference_batch: Whether to unpack the inference batch with **kwargs. Defaults to False. autocast: Whether to use autocast. When True, we trade off minor accuracy for faster inference. Defaults to True.

Initialization

collect_outputs(
processed_outputs: list[dict[str, numpy.ndarray]],
) dict[str, numpy.ndarray]#
create_output_dataframe(
df_cpu: pandas.DataFrame,
collected_output: dict[str, numpy.ndarray],
) pandas.DataFrame#
inputs() tuple[list[str], list[str]]#

Define stage input requirements.

Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present

outputs() tuple[list[str], list[str]]#

Define stage output specification.

Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies

process(
batch: nemo_curator.tasks.DocumentBatch,
) nemo_curator.tasks.DocumentBatch#

Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out

process_model_output(
outputs: torch.Tensor,
model_input_batch: dict[str, torch.Tensor] | None = None,
) dict[str, numpy.ndarray] | torch.Tensor#
setup(
_: nemo_curator.backends.base.WorkerMetadata | None = None,
) None#

Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)

setup_on_node(
_node_info: nemo_curator.backends.base.NodeInfo | None = None,
_worker_metadata: nemo_curator.backends.base.WorkerMetadata = None,
) None#

Setup method called once per node in distributed settings. Override this method to perform node-level initialization. Args: node_info (NodeInfo, optional): Information about the node (provided by some backends) worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)

teardown() None#

Teardown method called once after processing ends. Override this method to perform any cleanup.

yield_next_batch(
df: pandas.DataFrame,
) collections.abc.Generator[dict[str, torch.Tensor]]#

Yields a generator of model inputs for the next batch. We only move the batch to the GPU to reduce the memory overhead.

Args: df (pd.DataFrame): The Pandas DataFrame (with input_ids and attention_mask) to process.

Yields: Generator[dict[str, torch.Tensor]]: A generator of model inputs for the next batch.