stages.deduplication.fuzzy.connected_components
#
Module Contents#
Classes#
Base class for all processing stages. Processing stages operate on Task objects (or subclasses like DocumentBatch). Each stage type can declare what type of Task it processes as input (X) and what type it produces as output (Y). Stages can return either: |
API#
- class stages.deduplication.fuzzy.connected_components.ConnectedComponentsStage(
- output_path: str,
- source_field: str = f'{CURATOR_DEDUP_ID_STR}_x',
- destination_field: str = f'{CURATOR_DEDUP_ID_STR}_y',
- read_kwargs: dict | None = None,
- write_kwargs: dict | None = None,
Bases:
nemo_curator.stages.base.ProcessingStage
[nemo_curator.tasks.file_group.FileGroupTask
,nemo_curator.tasks.file_group.FileGroupTask
],nemo_curator.stages.deduplication.io_utils.DeduplicationIO
Base class for all processing stages. Processing stages operate on Task objects (or subclasses like DocumentBatch). Each stage type can declare what type of Task it processes as input (X) and what type it produces as output (Y). Stages can return either:
A single task (typical for transformations)
A list of tasks (for stages that split work, like readers)
None (for filtered out tasks)
Initialization
Args: output_path: The path to write the resulting connected components to. source_field: The field name containing the document ids of the source of the edge. destination_field: The field name containing the document ids of the destination of the edge. read_kwargs: Keyword arguments to pass for reading the input files. write_kwargs: Keyword arguments to pass for writing the output files.
- process(
- task: nemo_curator.tasks.file_group.FileGroupTask,
Process a task and return the result. Args: task (X): Input task to process Returns (Y | list[Y]): - Single task: For 1-to-1 transformations - List of tasks: For 1-to-many transformations (e.g., readers) - None: If the task should be filtered out
- process_batch(
- tasks: list[nemo_curator.tasks.file_group.FileGroupTask],
Process a batch of input files containing edges between documents. Compute the weakly connected components of the graph and write a mapping of document ids to their connected component id.
Parameters
tasks: list[FileGroupTask] A list of FileGroupTasks containing the input files. Returns
list[FileGroupTask] A list of FileGroupTasks containing the output doc_id to connected component id mapping.
- ray_stage_spec() dict[str, Any] #
Get Ray configuration for this stage. Note : This is only used for Ray Data which is an experimental backend. The keys are defined in RayStageSpecKeys in backends/experimental/ray_data/utils.py
Returns (dict[str, Any]): Dictionary containing Ray-specific configuration
- setup(_worker_metadata: WorkerMetadata | None = None) None #
Setup method called once before processing begins. Override this method to perform any initialization that should happen once per worker. Args: worker_metadata (WorkerMetadata, optional): Information about the worker (provided by some backends)
- weakly_connected_components(
- df: cudf.DataFrame,
- src_col: str,
- dst_col: str,
Compute the weakly connected components of a graph.
This method loads a chunk of the graph, creates a cuGraph object, and computes the weakly connected components using the MGGraph library.
Parameters
start: int The start index of the chunk. stop: int The stop index of the chunk.