About NeMo CuratorConceptsText ConceptsData

Data Acquisition Concepts

View as Markdown

This guide covers the core concepts for acquiring and processing text data from remote sources in NeMo Curator. Data acquisition focuses on downloading, extracting, and converting remote data sources into the DocumentBatch format for further processing.

Overview

Data acquisition in NeMo Curator follows a four-stage architecture:

  1. Generate URLs: Discover and generate download URLs from minimal input
  2. Download: Retrieve raw data files from remote sources
  3. Iterate: Extract individual records from downloaded containers
  4. Extract: Convert raw content to clean, structured text

This process transforms diverse remote data sources into a standardized DocumentBatch that can be used throughout the text curation pipeline.

Core Components

The data acquisition framework consists of four abstract base classes that define the acquisition workflow:

URLGenerator

Generates URLs for downloading from minimal input configuration. You need to override generate_urls which generates a bunch of URLs that user wants to download.

Example Implementation:

1from dataclasses import dataclass
2from nemo_curator.stages.text.download import URLGenerator
3
4@dataclass
5class CustomURLGenerator(URLGenerator):
6 def generate_urls(self):
7 # Custom URL generation logic
8 urls = []
9 ...
10 return urls

DocumentDownloader

Connects to and downloads data from remote repositories. You must override _get_output_filename and _download_to_path which are called by an underlying function called download which tries to be idempotent.

Example Implementation:

1from nemo_curator.stages.text.download import DocumentDownloader
2
3class CustomDownloader(DocumentDownloader):
4 def __init__(self, download_dir: str):
5 super().__init__(download_dir=download_dir)
6
7 def _get_output_filename(self, url: str) -> str:
8 # Custom logic to extract filename from URL
9 return url.split("/")[-1]
10
11 def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
12 # Custom download logic
13 # Return (success_bool, error_message)
14 try:
15 # ... download implementation ...
16 return True, None
17 except Exception as e:
18 return False, str(e)

DocumentIterator

Extracts individual records from downloaded containers. You should only override iterate and output_columns where iterate must have logic to load the local file path and return bunch of documents. The list[dict] is finally considered to a Pandas DataFrame which is passed to Extractor.

Example Implementation:

1from collections.abc import Iterator
2from typing import Any
3from nemo_curator.stages.text.download import DocumentIterator
4
5class CustomIterator(DocumentIterator):
6 def __init__(self, log_frequency: int = 1000):
7 super().__init__()
8 self._log_frequency = log_frequency
9
10 def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
11 # Custom iteration logic to load local file and return documents
12 for record in load_local_file_fn(file_path):
13 yield {"content": record_content, "metadata": record_metadata}
14
15 def output_columns(self) -> list[str]:
16 return ["content", "metadata"]

DocumentExtractor (Optional)

DocumentExtractor works on a Pandas DataFrame and is optional.

Example Implementation:

1from typing import Any
2from nemo_curator.stages.text.download import DocumentExtractor
3
4class CustomExtractor(DocumentExtractor):
5 def __init__(self):
6 super().__init__()
7
8 def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
9 # Custom extraction logic
10 cleaned_text = clean_content(record["content"])
11 detected_lang = detect_language(cleaned_text)
12 return {"text": cleaned_text, "language": detected_lang}
13
14 def input_columns(self):
15 return ["content", "metadata"]
16
17 def output_columns(self):
18 return ["text", "language"]

Supported Data Sources

NeMo Curator provides built-in support for major public text datasets:

Integration with Pipeline Architecture

The data acquisition process seamlessly integrates with NeMo Curator’s pipeline-based architecture. The DocumentDownloadExtractStage handles parallel processing through the distributed computing framework.

Acquisition Workflow

1from nemo_curator.core.client import RayClient
2from nemo_curator.pipeline import Pipeline
3from nemo_curator.stages.text.download import DocumentDownloadExtractStage
4from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
5from nemo_curator.stages.base import ProcessingStage
6
7# Create composite stage
8class CustomDownloadExtractStage(DocumentDownloadExtractStage):
9 def __init__(
10 self,
11 download_dir: str = "./custom_downloads",
12 url_limit: int | None = None,
13 record_limit: int | None = None,
14 add_filename_column: bool | str = True,
15 ):
16 # Create the URL generator
17 self.url_generator = CustomURLGenerator()
18
19 # Create the downloader
20 self.downloader = CustomDownloader(download_dir=download_dir)
21
22 # Create the iterator
23 self.iterator = CustomIterator()
24
25 # Create the extractor
26 self.extractor = CustomExtractor()
27
28 # Initialize the parent composite stage
29 super().__init__(
30 url_generator=self.url_generator,
31 downloader=self.downloader,
32 iterator=self.iterator,
33 extractor=self.extractor,
34 url_limit=url_limit,
35 record_limit=record_limit,
36 add_filename_column=add_filename_column,
37 )
38 self.name = "custom_pipeline"
39
40 def decompose(self) -> list[ProcessingStage]:
41 """Decompose this composite stage into its constituent stages."""
42 return self.stages
43
44 def get_description(self) -> str:
45 """Get a description of this composite stage."""
46 return "Custom pipeline"
47
48# Initialize Ray client
49ray_client = RayClient()
50ray_client.start()
51
52# Define acquisition pipeline
53pipeline = Pipeline(name="data_acquisition")
54
55# Create download and extract stage with custom components
56custom_download_extract_stage = CustomDownloadExtractStage(...)
57pipeline.add_stage(custom_download_extract_stage)
58
59# Write the results
60pipeline.add_stage(JsonlWriter(...))
61
62# Execute acquisition pipeline
63results = pipeline.run()
64
65# Stop Ray client
66ray_client.stop()

Performance Optimization

Parallel Processing

Data acquisition leverages distributed computing frameworks for scalable processing:

  • Parallel Downloads: Each URL in the generated list downloads through separate workers
  • Concurrent Extraction: Files process in parallel across workers
  • Memory Management: Streaming processing for large files

Integration with Data Loading

Data acquisition produces a standardized output that integrates seamlessly with Curator’s Data Loading Concepts :

Data acquisition includes basic content-level deduplication during extraction (such as removing duplicate HTML content within individual web pages). This is separate from the main deduplication pipeline stages (exact, fuzzy, and semantic deduplication) that operate on the full dataset after acquisition.

1from nemo_curator.stages.text.io.writer import ParquetWriter
2
3# Create acquisition pipeline with all stages including writer
4acquisition_pipeline = Pipeline(name="data_acquisition")
5# ... add acquisition stages ...
6
7# Add writer to save results directly
8writer = ParquetWriter(path="acquired_data/")
9acquisition_pipeline.add_stage(writer)
10
11# Run pipeline to acquire and save data in one execution
12results = acquisition_pipeline.run()
13
14# Later: Load using pipeline-based data loading
15from nemo_curator.stages.text.io.reader import ParquetReader
16
17load_pipeline = Pipeline(name="load_acquired_data")
18reader = ParquetReader(file_paths="acquired_data/")
19load_pipeline.add_stage(reader)

This enables you to:

  • Separate acquisition from processing for better workflow management
  • Cache acquired data to avoid re-downloading
  • Mix acquired and local data in the same processing pipeline
  • Use standard loading patterns regardless of data origin