Custom Data Loading#
Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator’s distributed processing.
How It Works#
Curator uses the same 4-step pipeline pattern described in Data Acquisition Concepts for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines.
Architecture Overview#
For detailed information about the core components and data flow, see Data Acquisition Concepts and Data Loading Concepts.
Implementation Guide#
1. Create Directory Structure#
your_data_source/
├── __init__.py
├── stage.py # Main composite stage
├── url_generation.py # URL generation logic
├── download.py # Download implementation
├── iterator.py # File iteration logic
└── extract.py # Data extraction logic (optional)
2. Build Core Components#
URL Generator (url_generation.py)#
from dataclasses import dataclass
from nemo_curator.stages.text.download import URLGenerator
@dataclass
class CustomURLGenerator(URLGenerator):
def generate_urls(self) -> list[str]:
"""Generate list of URLs to download."""
# Your URL generation logic here
return [
"https://example.com/dataset1.zip",
"https://example.com/dataset2.zip",
]
Document Downloader (download.py)#
from nemo_curator.stages.text.download import DocumentDownloader
class CustomDownloader(DocumentDownloader):
def __init__(self, download_dir: str):
super().__init__(download_dir=download_dir)
def _get_output_filename(self, url: str) -> str:
"""Extract filename from URL."""
return url.split("/")[-1]
def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
"""Download file from URL to local path."""
# Custom download logic
# Return (success_bool, error_message)
try:
# ... download implementation ...
return True, None
except Exception as e:
return False, str(e)
Document Iterator (iterator.py)#
import json
from collections.abc import Iterator
from typing import Any
from nemo_curator.stages.text.download import DocumentIterator
class CustomIterator(DocumentIterator):
def __init__(self, log_frequency: int = 1000):
super().__init__()
self._log_frequency = log_frequency
def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
"""Iterate over records in a file."""
# Custom iteration logic to load local file and return documents
for record in load_local_file_fn(file_path):
yield {"content": record_content, "metadata": record_metadata, "id": record_id}
def output_columns(self) -> list[str]:
"""Define output columns."""
return ["content", "metadata", "id"]
Document Extractor (extract.py)#
from typing import Any
from nemo_curator.stages.text.download import DocumentExtractor
class CustomExtractor(DocumentExtractor):
def __init__(self):
super().__init__()
def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
"""Transform raw record to final format."""
# Skip invalid records
if not record.get("content"):
return None
# Extract and clean text
cleaned_text = self._clean_text(record["content"])
# Generate unique ID if not present
doc_id = record.get("id", self._generate_id(cleaned_text))
return {
"text": cleaned_text,
"id": doc_id,
"source": record.get("metadata", {}).get("source", "unknown")
}
def input_columns(self) -> list[str]:
return ["content", "metadata", "id"]
def output_columns(self) -> list[str]:
return ["text", "id", "source"]
def _clean_text(self, text: str) -> str:
"""Clean and normalize text."""
# Your text cleaning logic here
return text.strip()
def _generate_id(self, text: str) -> str:
"""Generate unique ID for text."""
import hashlib
return hashlib.md5(text.encode()).hexdigest()[:16]
3. Create Composite Stage (stage.py)#
from nemo_curator.stages.text.download import DocumentDownloadExtractStage
from nemo_curator.stages.base import ProcessingStage
from .url_generation import CustomURLGenerator
from .download import CustomDownloader
from .iterator import CustomIterator
from .extract import CustomExtractor
class CustomDataStage(DocumentDownloadExtractStage):
"""Custom data loading stage combining all components."""
def __init__(
self,
download_dir: str = "./custom_downloads",
url_limit: int | None = None,
record_limit: int | None = None,
add_filename_column: bool | str = True,
):
self.url_generator = CustomURLGenerator()
self.downloader = CustomDownloader(download_dir=download_dir)
self.iterator = CustomIterator()
self.extractor = CustomExtractor()
# Initialize the parent composite stage
super().__init__(
url_generator=self.url_generator,
downloader=self.downloader,
iterator=self.iterator,
extractor=self.extractor, # Optional - remove if not needed
url_limit=url_limit,
record_limit=record_limit,
add_filename_column=add_filename_column,
)
self.name = "custom_data"
def decompose(self) -> list[ProcessingStage]:
"""Decompose this composite stage into its constituent stages."""
return self.stages
def get_description(self) -> str:
"""Get a description of this composite stage."""
return "Custom data"
Usage Examples#
Basic Pipeline#
from nemo_curator.core.client import RayClient
from nemo_curator.pipeline import Pipeline
from your_data_source.stage import CustomDataStage
from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
def main():
# Initialize Ray client
ray_client = RayClient()
ray_client.start()
# Create pipeline
pipeline = Pipeline(
name="custom_data_pipeline",
description="Load and process custom dataset"
)
# Create custom data loading stage
data_stage = CustomDataStage(...)
pipeline.add_stage(data_stage)
# Save the results to JSONL
pipeline.add_stage(JsonlWriter(...))
# Run pipeline
print("Starting pipeline...")
results = pipeline.run()
# Stop Ray client
ray_client.stop()
if __name__ == "__main__":
main()
For executor options and configuration, refer to Pipeline Execution Backends.
Parameters Reference#
Parameter |
Type |
Description |
Default |
|---|---|---|---|
|
URLGenerator |
Custom URL generation implementation |
Required |
|
DocumentDownloader |
Custom download implementation |
Required |
|
DocumentIterator |
Custom file iteration implementation |
Required |
|
DocumentExtractor | None |
Optional extraction/transformation step |
None |
|
int | None |
Maximum number of URLs to process |
None |
|
int | None |
Maximum records per file |
None |
|
bool | str |
Add filename column to output; if str, uses it as the column name (default name: “file_name”) |
True |
Output Format#
Processed data flows through the pipeline as DocumentBatch tasks containing Pandas DataFrames or PyArrow Tables:
Example Output Schema#
{
"text": "This is the processed document text",
"id": "unique-document-id",
"source": "example.com",
"file_name": "dataset1.jsonl" # If add_filename_column=True (default column name)
}