Data Acquisition Concepts#
This guide covers the core concepts for acquiring and processing text data from remote sources in NeMo Curator. Data acquisition focuses on downloading, extracting, and converting remote data sources into the DocumentBatch format for further processing.
Overview#
Data acquisition in NeMo Curator follows a four-stage architecture:
Generate URLs: Discover and generate download URLs from minimal input
Download: Retrieve raw data files from remote sources
Iterate: Extract individual records from downloaded containers
Extract: Convert raw content to clean, structured text
This process transforms diverse remote data sources into a standardized DocumentBatch that can be used throughout the text curation pipeline.
Core Components#
The data acquisition framework consists of four abstract base classes that define the acquisition workflow:
URLGenerator#
Generates URLs for downloading from minimal input configuration. You need to override generate_urls which generates a bunch of URLs that user wants to download.
Example Implementation:
from dataclasses import dataclass
from nemo_curator.stages.text.download import URLGenerator
@dataclass
class CustomURLGenerator(URLGenerator):
def generate_urls(self):
# Custom URL generation logic
urls = []
...
return urls
DocumentDownloader#
Connects to and downloads data from remote repositories. You must override _get_output_filename and _download_to_path which are called by an underlying function called download which tries to be idempotent.
Example Implementation:
from nemo_curator.stages.text.download import DocumentDownloader
class CustomDownloader(DocumentDownloader):
def __init__(self, download_dir: str):
super().__init__(download_dir=download_dir)
def _get_output_filename(self, url: str) -> str:
# Custom logic to extract filename from URL
return url.split("/")[-1]
def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
# Custom download logic
# Return (success_bool, error_message)
try:
# ... download implementation ...
return True, None
except Exception as e:
return False, str(e)
DocumentIterator#
Extracts individual records from downloaded containers. You should only override iterate and output_columns where iterate must have logic to load the local file path and return bunch of documents. The list[dict] is finally considered to a Pandas DataFrame which is passed to Extractor.
Example Implementation:
from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator
class CustomIterator(DocumentIterator):
def __init__(self, log_frequency: int = 1000):
super().__init__()
self._log_frequency = log_frequency
def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
# Custom iteration logic to load local file and return documents
for record in load_local_file_fn(file_path):
yield {"content": record_content, "metadata": record_metadata}
def output_columns(self) -> list[str]:
return ["content", "metadata"]
DocumentExtractor (Optional)#
DocumentExtractor works on a Pandas DataFrame and is optional.
Example Implementation:
from typing import Any
from nemo_curator.stages.text.download import DocumentExtractor
class CustomExtractor(DocumentExtractor):
def __init__(self):
super().__init__()
def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
# Custom extraction logic
cleaned_text = clean_content(record["content"])
detected_lang = detect_language(cleaned_text)
return {"text": cleaned_text, "language": detected_lang}
def input_columns(self):
return ["content", "metadata"]
def output_columns(self):
return ["text", "language"]
Supported Data Sources#
NeMo Curator provides built-in support for major public text datasets:
Download and extract web archive data from Common Crawl
Download and extract scientific papers from arXiv
Download and extract Wikipedia articles from Wikipedia dumps
Implement a download and extract pipeline for a custom data source
Integration with Pipeline Architecture#
The data acquisition process seamlessly integrates with NeMo Curator’s pipeline-based architecture. The DocumentDownloadExtractStage handles parallel processing through the distributed computing framework.
Acquisition Workflow#
from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.download import DocumentDownloadExtractStage
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
from nemo_curator.stages.base import ProcessingStage
# Create composite stage
class CustomDownloadExtractStage(DocumentDownloadExtractStage):
def __init__(
self,
download_dir: str = "./custom_downloads",
url_limit: int | None = None,
record_limit: int | None = None,
add_filename_column: bool | str = True,
):
# Create the URL generator
self.url_generator = CustomURLGenerator()
# Create the downloader
self.downloader = CustomDownloader(download_dir=download_dir)
# Create the iterator
self.iterator = CustomIterator()
# Create the extractor
self.extractor = CustomExtractor()
# Initialize the parent composite stage
super().__init__(
url_generator=self.url_generator,
downloader=self.downloader,
iterator=self.iterator,
extractor=self.extractor,
url_limit=url_limit,
record_limit=record_limit,
add_filename_column=add_filename_column,
)
self.name = "custom_pipeline"
def decompose(self) -> list[ProcessingStage]:
"""Decompose this composite stage into its constituent stages."""
return self.stages
def get_description(self) -> str:
"""Get a description of this composite stage."""
return "Custom pipeline"
# Initialize Ray client
ray_client = RayClient()
ray_client.start()
# Define acquisition pipeline
pipeline = Pipeline(name="data_acquisition")
# Create download and extract stage with custom components
custom_download_extract_stage = CustomDownloadExtractStage(...)
pipeline.add_stage(custom_download_extract_stage)
# Write the results
pipeline.add_stage(JsonlWriter(...))
# Execute acquisition pipeline
results = pipeline.run()
# Stop Ray client
ray_client.stop()
Performance Optimization#
Parallel Processing#
Data acquisition leverages distributed computing frameworks for scalable processing:
Parallel Downloads: Each URL in the generated list downloads through separate workers
Concurrent Extraction: Files process in parallel across workers
Memory Management: Streaming processing for large files
Integration with Data Loading#
Data acquisition produces a standardized output that integrates seamlessly with Curator’s Data Loading Concepts:
Note
Data acquisition includes basic content-level deduplication during extraction (such as removing duplicate HTML content within individual web pages). This is separate from the main deduplication pipeline stages (exact, fuzzy, and semantic deduplication) that operate on the full dataset after acquisition.
from nemo_curator.stages.text.io.writer import ParquetWriter
# Create acquisition pipeline with all stages including writer
acquisition_pipeline = Pipeline(name="data_acquisition")
# ... add acquisition stages ...
# Add writer to save results directly
writer = ParquetWriter(path="acquired_data/")
acquisition_pipeline.add_stage(writer)
# Run pipeline to acquire and save data in one execution
results = acquisition_pipeline.run()
# Later: Load using pipeline-based data loading
from nemo_curator.stages.text.io.reader import ParquetReader
load_pipeline = Pipeline(name="load_acquired_data")
reader = ParquetReader(file_paths="acquired_data/")
load_pipeline.add_stage(reader)
This enables you to:
Separate acquisition from processing for better workflow management
Cache acquired data to avoid re-downloading
Mix acquired and local data in the same processing pipeline
Use standard loading patterns regardless of data origin