nemo_curator.stages.deduplication.exact.workflow

View as MarkdownOpen in Claude

Module Contents

Classes

NameDescription
ExactDeduplicationWorkflowA pipeline that performs exact deduplication of a dataset.

Data

ID_GENERATOR_OUTPUT_FILENAME

API

class nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow(
output_path: str,
input_path: str | list[str] | None = None,
input_filetype: typing.Literal['jsonl', 'parquet'] = 'parquet',
input_blocksize: str | int = '2GiB',
identification_batchsize: int = 1,
input_file_extensions: list[str] | None = None,
read_kwargs: dict[str, typing.Any] | None = None,
write_kwargs: dict[str, typing.Any] | None = None,
assign_id: bool = True,
id_field: str | None = None,
text_field: str = 'text',
perform_removal: bool = False,
total_nparts: int | None = None,
rmm_pool_size: int | typing.Literal['auto'] | None = 'auto',
spill_memory_limit: int | typing.Literal['auto'] | None = 'auto',
env_vars: dict[str, typing.Any] | None = None
)

Bases: WorkflowBase

A pipeline that performs exact deduplication of a dataset. It consists of the following stages:

  • FilePartitioningStage Groups input files into smaller groups that can be processed in parallel.
  • ExactDuplicateIdentification Finds exact duplicates in a given column by hashing the column.
  • Removal (Optional) Currently not implemented.
executor_config
nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow._create_identification_pipeline(
num_input_tasks: int
) -> nemo_curator.pipeline.Pipeline
nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow._create_input_filegroups() -> nemo_curator.pipeline.Pipeline
nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow._validate_initial_tasks(
initial_tasks: list[nemo_curator.tasks.FileGroupTask] | None
) -> None
nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow._validate_inputs() -> None
nemo_curator.stages.deduplication.exact.workflow.ExactDeduplicationWorkflow.run(
initial_tasks: list[nemo_curator.tasks.FileGroupTask] | None = None,
executor: nemo_curator.backends.experimental.ray_actor_pool.RayActorPoolExecutor | None = None
) -> nemo_curator.pipeline.workflow.WorkflowRunResult

Run the deduplication pipeline.

executor: RayActorPoolExecutor | None Executor to use for the pipeline. If not provided, the default RayActorPoolExecutor will be used.

Parameters:

initial_tasks
list[FileGroupTask] | NoneDefaults to None

Returns: WorkflowRunResult

WorkflowRunResult object containing the results and timing information

nemo_curator.stages.deduplication.exact.workflow.ID_GENERATOR_OUTPUT_FILENAME = 'exact_id_generator.json'