Curate TextLoad Data
Custom Data Loading
Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator’s distributed processing.
How It Works
Curator uses the same 4-step pipeline pattern described in Data Acquisition Concepts for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines.
Architecture Overview
For detailed information about the core components and data flow, see Data Acquisition Concepts and Data Loading Concepts .
Implementation Guide
1. Create Directory Structure
your_data_source/ ├── __init__.py ├── stage.py # Main composite stage ├── url_generation.py # URL generation logic ├── download.py # Download implementation ├── iterator.py # File iteration logic └── extract.py # Data extraction logic (optional)
2. Build Core Components
URL Generator (url_generation.py)
1 from dataclasses import dataclass 2 from nemo_curator.stages.text.download import URLGenerator 3 4 @dataclass 5 class CustomURLGenerator(URLGenerator): 6 def generate_urls(self) -> list[str]: 7 """Generate list of URLs to download.""" 8 # Your URL generation logic here 9 return [ 10 "https://example.com/dataset1.zip", 11 "https://example.com/dataset2.zip", 12 ]
Document Downloader (download.py)
1 from nemo_curator.stages.text.download import DocumentDownloader 2 3 class CustomDownloader(DocumentDownloader): 4 def __init__(self, download_dir: str): 5 super().__init__(download_dir=download_dir) 6 7 def _get_output_filename(self, url: str) -> str: 8 """Extract filename from URL.""" 9 return url.split("/")[-1] 10 11 def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]: 12 """Download file from URL to local path.""" 13 # Custom download logic 14 # Return (success_bool, error_message) 15 try: 16 # ... download implementation ... 17 return True, None 18 except Exception as e: 19 return False, str(e)
Document Iterator (iterator.py)
1 import json 2 from collections.abc import Iterator 3 from typing import Any 4 from nemo_curator.stages.text.download import DocumentIterator 5 6 class CustomIterator(DocumentIterator): 7 def __init__(self, log_frequency: int = 1000): 8 super().__init__() 9 self._log_frequency = log_frequency 10 11 def iterate(self, file_path: str) -> Iterator[dict[str, Any]]: 12 """Iterate over records in a file.""" 13 # Custom iteration logic to load local file and return documents 14 for record in load_local_file_fn(file_path): 15 yield {"content": record_content, "metadata": record_metadata, "id": record_id} 16 17 def output_columns(self) -> list[str]: 18 """Define output columns.""" 19 return ["content", "metadata", "id"]
Document Extractor (extract.py)
1 from typing import Any 2 from nemo_curator.stages.text.download import DocumentExtractor 3 4 class CustomExtractor(DocumentExtractor): 5 def __init__(self): 6 super().__init__() 7 8 def extract(self, record: dict[str, str]) -> dict[str, Any] | None: 9 """Transform raw record to final format.""" 10 # Skip invalid records 11 if not record.get("content"): 12 return None 13 14 # Extract and clean text 15 cleaned_text = self._clean_text(record["content"]) 16 17 # Generate unique ID if not present 18 doc_id = record.get("id", self._generate_id(cleaned_text)) 19 20 return { 21 "text": cleaned_text, 22 "id": doc_id, 23 "source": record.get("metadata", {}).get("source", "unknown") 24 } 25 26 def input_columns(self) -> list[str]: 27 return ["content", "metadata", "id"] 28 29 def output_columns(self) -> list[str]: 30 return ["text", "id", "source"] 31 32 def _clean_text(self, text: str) -> str: 33 """Clean and normalize text.""" 34 # Your text cleaning logic here 35 return text.strip() 36 37 def _generate_id(self, text: str) -> str: 38 """Generate unique ID for text.""" 39 import hashlib 40 return hashlib.md5(text.encode()).hexdigest()[:16]
3. Create Composite Stage (stage.py)
1 from nemo_curator.stages.text.download import DocumentDownloadExtractStage 2 from nemo_curator.stages.base import ProcessingStage 3 from .url_generation import CustomURLGenerator 4 from .download import CustomDownloader 5 from .iterator import CustomIterator 6 from .extract import CustomExtractor 7 8 class CustomDataStage(DocumentDownloadExtractStage): 9 """Custom data loading stage combining all components.""" 10 11 def __init__( 12 self, 13 download_dir: str = "./custom_downloads", 14 url_limit: int | None = None, 15 record_limit: int | None = None, 16 add_filename_column: bool | str = True, 17 ): 18 self.url_generator = CustomURLGenerator() 19 self.downloader = CustomDownloader(download_dir=download_dir) 20 self.iterator = CustomIterator() 21 self.extractor = CustomExtractor() 22 23 # Initialize the parent composite stage 24 super().__init__( 25 url_generator=self.url_generator, 26 downloader=self.downloader, 27 iterator=self.iterator, 28 extractor=self.extractor, # Optional - remove if not needed 29 url_limit=url_limit, 30 record_limit=record_limit, 31 add_filename_column=add_filename_column, 32 ) 33 self.name = "custom_data" 34 35 def decompose(self) -> list[ProcessingStage]: 36 """Decompose this composite stage into its constituent stages.""" 37 return self.stages 38 39 def get_description(self) -> str: 40 """Get a description of this composite stage.""" 41 return "Custom data"
Usage Examples
Basic Pipeline
1 from nemo_curator.core.client import RayClient 2 from nemo_curator.pipeline import Pipeline 3 from your_data_source.stage import CustomDataStage 4 from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter 5 6 def main(): 7 # Initialize Ray client 8 ray_client = RayClient() 9 ray_client.start() 10 11 # Create pipeline 12 pipeline = Pipeline( 13 name="custom_data_pipeline", 14 description="Load and process custom dataset" 15 ) 16 17 # Create custom data loading stage 18 data_stage = CustomDataStage(...) 19 20 pipeline.add_stage(data_stage) 21 22 # Save the results to JSONL 23 pipeline.add_stage(JsonlWriter(...)) 24 25 # Run pipeline 26 print("Starting pipeline...") 27 results = pipeline.run() 28 29 # Stop Ray client 30 ray_client.stop() 31 32 if __name__ == "__main__": 33 main()
For executor options and configuration, refer to Execution Backends.
Parameters Reference
| Parameter | Type | Description | Default |
|---|---|---|---|
url_generator | URLGenerator | Custom URL generation implementation | Required |
downloader | DocumentDownloader | Custom download implementation | Required |
iterator | DocumentIterator | Custom file iteration implementation | Required |
extractor | DocumentExtractor | None | Optional extraction/transformation step | None |
url_limit | int | None | Maximum number of URLs to process | None |
record_limit | int | None | Maximum records per file | None |
add_filename_column | bool | str | Add filename column to output; if str, uses it as the column name (default name: “file_name”) | True |
Output Format
Processed data flows through the pipeline as DocumentBatch tasks containing Pandas DataFrames or PyArrow Tables:
Example Output Schema
1 { 2 "text": "This is the processed document text", 3 "id": "unique-document-id", 4 "source": "example.com", 5 "file_name": "dataset1.jsonl" # If add_filename_column=True (default column name) 6 }