Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator’s distributed processing.
Curator uses the same 3-step pipeline pattern described in Data Acquisition Concepts for custom data loading. Each step maps to one or more abstract base classes with corresponding processing stages that compose into pipelines.
For detailed information about the core components and data flow, see Data Acquisition Concepts and Data Loading Concepts.
your_data_source/ ├── __init__.py ├── stage.py # Main composite stage ├── url_generation.py # URL generation logic ├── download.py # Download implementation ├── iterator.py # File iteration logic └── extract.py # Data extraction logic (optional)
url_generation.py)
1 from dataclasses import dataclass 2 from nemo_curator.stages.text.download import URLGenerator 3 4 @dataclass 5 class CustomURLGenerator(URLGenerator): 6 def generate_urls(self) -> list[str]: 7 """Generate list of URLs to download.""" 8 # Your URL generation logic here 9 return [ 10 "https://example.com/dataset1.zip", 11 "https://example.com/dataset2.zip", 12 ]
download.py)
1 from nemo_curator.stages.text.download import DocumentDownloader 2 3 class CustomDownloader(DocumentDownloader): 4 def __init__(self, download_dir: str): 5 super().__init__(download_dir=download_dir) 6 7 def _get_output_filename(self, url: str) -> str: 8 """Extract filename from URL.""" 9 return url.split("/")[-1] 10 11 def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]: 12 """Download file from URL to local path.""" 13 # Custom download logic 14 # Return (success_bool, error_message) 15 try: 16 # ... download implementation ... 17 return True, None 18 except Exception as e: 19 return False, str(e)
iterator.py)
1 import json 2 from collections.abc import Iterator 3 from typing import Any 4 from nemo_curator.stages.text.download import DocumentIterator 5 6 class CustomIterator(DocumentIterator): 7 def __init__(self, log_frequency: int = 1000): 8 super().__init__() 9 self._log_frequency = log_frequency 10 11 def iterate(self, file_path: str) -> Iterator[dict[str, Any]]: 12 """Iterate over records in a file.""" 13 # Custom iteration logic to load local file and return documents 14 for record in load_local_file_fn(file_path): 15 yield {"content": record_content, "metadata": record_metadata, "id": record_id} 16 17 def output_columns(self) -> list[str]: 18 """Define output columns.""" 19 return ["content", "metadata", "id"]
extract.py)
1 from typing import Any 2 from nemo_curator.stages.text.download import DocumentExtractor 3 4 class CustomExtractor(DocumentExtractor): 5 def __init__(self): 6 super().__init__() 7 8 def extract(self, record: dict[str, str]) -> dict[str, Any] | None: 9 """Transform raw record to final format.""" 10 # Skip invalid records 11 if not record.get("content"): 12 return None 13 14 # Extract and clean text 15 cleaned_text = self._clean_text(record["content"]) 16 17 # Generate unique ID if not present 18 doc_id = record.get("id", self._generate_id(cleaned_text)) 19 20 return { 21 "text": cleaned_text, 22 "id": doc_id, 23 "source": record.get("metadata", {}).get("source", "unknown") 24 } 25 26 def input_columns(self) -> list[str]: 27 return ["content", "metadata", "id"] 28 29 def output_columns(self) -> list[str]: 30 return ["text", "id", "source"] 31 32 def _clean_text(self, text: str) -> str: 33 """Clean and normalize text.""" 34 # Your text cleaning logic here 35 return text.strip() 36 37 def _generate_id(self, text: str) -> str: 38 """Generate unique ID for text.""" 39 import hashlib 40 return hashlib.md5(text.encode()).hexdigest()[:16]
stage.py)
1 from nemo_curator.stages.text.download import DocumentDownloadExtractStage 2 from nemo_curator.stages.base import ProcessingStage 3 from .url_generation import CustomURLGenerator 4 from .download import CustomDownloader 5 from .iterator import CustomIterator 6 from .extract import CustomExtractor 7 8 class CustomDataStage(DocumentDownloadExtractStage): 9 """Custom data loading stage combining all components.""" 10 11 def __init__( 12 self, 13 download_dir: str = "./custom_downloads", 14 url_limit: int | None = None, 15 record_limit: int | None = None, 16 add_filename_column: bool | str = True, 17 ): 18 self.url_generator = CustomURLGenerator() 19 self.downloader = CustomDownloader(download_dir=download_dir) 20 self.iterator = CustomIterator() 21 self.extractor = CustomExtractor() 22 23 # Initialize the parent composite stage 24 super().__init__( 25 url_generator=self.url_generator, 26 downloader=self.downloader, 27 iterator=self.iterator, 28 extractor=self.extractor, # Optional - remove if not needed 29 url_limit=url_limit, 30 record_limit=record_limit, 31 add_filename_column=add_filename_column, 32 ) 33 self.name = "custom_data" 34 35 def decompose(self) -> list[ProcessingStage]: 36 """Decompose this composite stage into its constituent stages.""" 37 return self.stages 38 39 def get_description(self) -> str: 40 """Get a description of this composite stage.""" 41 return "Custom data"
1 from nemo_curator.core.client import RayClient 2 from nemo_curator.pipeline import Pipeline 3 from your_data_source.stage import CustomDataStage 4 from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter 5 6 def main(): 7 # Initialize Ray client 8 ray_client = RayClient() 9 ray_client.start() 10 11 # Create pipeline 12 pipeline = Pipeline( 13 name="custom_data_pipeline", 14 description="Load and process custom dataset" 15 ) 16 17 # Create custom data loading stage 18 data_stage = CustomDataStage(...) 19 20 pipeline.add_stage(data_stage) 21 22 # Save the results to JSONL 23 pipeline.add_stage(JsonlWriter(...)) 24 25 # Run pipeline 26 print("Starting pipeline...") 27 results = pipeline.run() 28 29 # Stop Ray client 30 ray_client.stop() 31 32 if __name__ == "__main__": 33 main()
For executor options and configuration, refer to Execution Backends.
| Parameter | Type | Description | Default |
|---|---|---|---|
url_generator | URLGenerator | Custom URL generation implementation | Required |
downloader | DocumentDownloader | Custom download implementation | Required |
iterator | DocumentIterator | Custom file iteration implementation | Required |
extractor | DocumentExtractor | None | Optional extraction/transformation step | None |
url_limit | int | None | Maximum number of URLs to process | None |
record_limit | int | None | Maximum records per file | None |
add_filename_column | bool | str | Add filename column to output; if str, uses it as the column name (default name: “file_name”) | True |
Processed data flows through the pipeline as DocumentBatch tasks containing Pandas DataFrames or PyArrow Tables:
1 { 2 "text": "This is the processed document text", 3 "id": "unique-document-id", 4 "source": "example.com", 5 "file_name": "dataset1.jsonl" # If add_filename_column=True (default column name) 6 }