stages.file_partitioning#

Module Contents#

Classes#

FilePartitioningStage

Stage that partitions input file paths into FileGroupTasks.

API#

class stages.file_partitioning.FilePartitioningStage#

Bases: nemo_curator.stages.base.ProcessingStage[nemo_curator.tasks._EmptyTask, nemo_curator.tasks.FileGroupTask]

Stage that partitions input file paths into FileGroupTasks.

This stage runs as a dedicated processing stage (not on the driver) and creates file groups based on the partitioning strategy.

Parameters

file_paths: str | list[str] Path to the input files. files_per_partition: int | None = None Number of files per partition. If provided, the blocksize is ignored. Defaults to 1 if both files_per_partition and blocksize are not provided. blocksize: int | str | None = None Target size of the partitions. Note: For compressed files, the compressed size is used for blocksize estimation. file_extensions: list[str] | None = None File extensions to filter. storage_options: dict[str, Any] | None = None Storage options to pass to the file system. limit: int | None = None Maximum number of partitions to create.

blocksize: int | str | None#

None

file_extensions: list[str] | None#

None

file_paths: str | list[str]#

None

files_per_partition: int | None#

None

inputs() tuple[list[str], list[str]]#

Define stage input requirements.

Returns (tuple[list[str], list[str]]): Tuple of (required_attributes, required_columns) where: - required_top_level_attributes: List of task attributes that must be present - required_data_attributes: List of attributes within the data that must be present

limit: int | None#

None

outputs() tuple[list[str], list[str]]#

Define stage output specification.

Returns (tuple[list[str], list[str]]): Tuple of (output_attributes, output_columns) where: - output_top_level_attributes: List of task attributes this stage adds/modifies - output_data_attributes: List of attributes within the data that this stage adds/modifies

process(
_: nemo_curator.tasks._EmptyTask,
) list[nemo_curator.tasks.FileGroupTask]#

Process the initial task to create file group tasks.

This stage expects a simple Task with file paths information and outputs multiple FileGroupTasks for parallel processing.

ray_stage_spec() dict[str, Any]#

Ray stage specification for this stage.

storage_options: dict[str, Any] | None#

None