*** description: Create custom data loading pipelines using Curator. categories: * how-to-guides tags: * custom-data * stages * pipelines * data-loading personas: * data-scientist-focused * mle-focused difficulty: advanced content\_type: how-to modality: text-only *** # Custom Data Loading Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator's distributed processing. ## How It Works Curator uses the same **4-step pipeline pattern** described in [Data Acquisition Concepts ](/about/concepts/text/data/acquisition) for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines. *** ## Architecture Overview For detailed information about the core components and data flow, see [Data Acquisition Concepts ](/about/concepts/text/data/acquisition) and [Data Loading Concepts ](/about/concepts/text/data/loading). *** ## Implementation Guide ### 1. Create Directory Structure ```text your_data_source/ ├── __init__.py ├── stage.py # Main composite stage ├── url_generation.py # URL generation logic ├── download.py # Download implementation ├── iterator.py # File iteration logic └── extract.py # Data extraction logic (optional) ``` ### 2. Build Core Components #### URL Generator (`url_generation.py`) ```python from dataclasses import dataclass from nemo_curator.stages.text.download import URLGenerator @dataclass class CustomURLGenerator(URLGenerator): def generate_urls(self) -> list[str]: """Generate list of URLs to download.""" # Your URL generation logic here return [ "https://example.com/dataset1.zip", "https://example.com/dataset2.zip", ] ``` #### Document Downloader (`download.py`) ```python from nemo_curator.stages.text.download import DocumentDownloader class CustomDownloader(DocumentDownloader): def __init__(self, download_dir: str): super().__init__(download_dir=download_dir) def _get_output_filename(self, url: str) -> str: """Extract filename from URL.""" return url.split("/")[-1] def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]: """Download file from URL to local path.""" # Custom download logic # Return (success_bool, error_message) try: # ... download implementation ... return True, None except Exception as e: return False, str(e) ``` #### Document Iterator (`iterator.py`) ```python import json from collections.abc import Iterator from typing import Any from nemo_curator.stages.text.download import DocumentIterator class CustomIterator(DocumentIterator): def __init__(self, log_frequency: int = 1000): super().__init__() self._log_frequency = log_frequency def iterate(self, file_path: str) -> Iterator[dict[str, Any]]: """Iterate over records in a file.""" # Custom iteration logic to load local file and return documents for record in load_local_file_fn(file_path): yield {"content": record_content, "metadata": record_metadata, "id": record_id} def output_columns(self) -> list[str]: """Define output columns.""" return ["content", "metadata", "id"] ``` #### Document Extractor (`extract.py`) ```python from typing import Any from nemo_curator.stages.text.download import DocumentExtractor class CustomExtractor(DocumentExtractor): def __init__(self): super().__init__() def extract(self, record: dict[str, str]) -> dict[str, Any] | None: """Transform raw record to final format.""" # Skip invalid records if not record.get("content"): return None # Extract and clean text cleaned_text = self._clean_text(record["content"]) # Generate unique ID if not present doc_id = record.get("id", self._generate_id(cleaned_text)) return { "text": cleaned_text, "id": doc_id, "source": record.get("metadata", {}).get("source", "unknown") } def input_columns(self) -> list[str]: return ["content", "metadata", "id"] def output_columns(self) -> list[str]: return ["text", "id", "source"] def _clean_text(self, text: str) -> str: """Clean and normalize text.""" # Your text cleaning logic here return text.strip() def _generate_id(self, text: str) -> str: """Generate unique ID for text.""" import hashlib return hashlib.md5(text.encode()).hexdigest()[:16] ``` ### 3. Create Composite Stage (`stage.py`) ```python from nemo_curator.stages.text.download import DocumentDownloadExtractStage from nemo_curator.stages.base import ProcessingStage from .url_generation import CustomURLGenerator from .download import CustomDownloader from .iterator import CustomIterator from .extract import CustomExtractor class CustomDataStage(DocumentDownloadExtractStage): """Custom data loading stage combining all components.""" def __init__( self, download_dir: str = "./custom_downloads", url_limit: int | None = None, record_limit: int | None = None, add_filename_column: bool | str = True, ): self.url_generator = CustomURLGenerator() self.downloader = CustomDownloader(download_dir=download_dir) self.iterator = CustomIterator() self.extractor = CustomExtractor() # Initialize the parent composite stage super().__init__( url_generator=self.url_generator, downloader=self.downloader, iterator=self.iterator, extractor=self.extractor, # Optional - remove if not needed url_limit=url_limit, record_limit=record_limit, add_filename_column=add_filename_column, ) self.name = "custom_data" def decompose(self) -> list[ProcessingStage]: """Decompose this composite stage into its constituent stages.""" return self.stages def get_description(self) -> str: """Get a description of this composite stage.""" return "Custom data" ``` *** ## Usage Examples ### Basic Pipeline ```python from nemo_curator.core.client import RayClient from nemo_curator.pipeline import Pipeline from your_data_source.stage import CustomDataStage from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter def main(): # Initialize Ray client ray_client = RayClient() ray_client.start() # Create pipeline pipeline = Pipeline( name="custom_data_pipeline", description="Load and process custom dataset" ) # Create custom data loading stage data_stage = CustomDataStage(...) pipeline.add_stage(data_stage) # Save the results to JSONL pipeline.add_stage(JsonlWriter(...)) # Run pipeline print("Starting pipeline...") results = pipeline.run() # Stop Ray client ray_client.stop() if __name__ == "__main__": main() ``` For executor options and configuration, refer to [Execution Backends](/reference/infra/execution-backends). *** ## Parameters Reference | Parameter | Type | Description | Default | | --------------------- | ------------------------- | ---------------------------------------------------------------------------------------------- | -------- | | `url_generator` | URLGenerator | Custom URL generation implementation | Required | | `downloader` | DocumentDownloader | Custom download implementation | Required | | `iterator` | DocumentIterator | Custom file iteration implementation | Required | | `extractor` | DocumentExtractor \| None | Optional extraction/transformation step | None | | `url_limit` | int \| None | Maximum number of URLs to process | None | | `record_limit` | int \| None | Maximum records per file | None | | `add_filename_column` | bool \| str | Add filename column to output; if str, uses it as the column name (default name: "file\_name") | True | *** ## Output Format Processed data flows through the pipeline as `DocumentBatch` tasks containing Pandas DataFrames or PyArrow Tables: ### Example Output Schema ```python { "text": "This is the processed document text", "id": "unique-document-id", "source": "example.com", "file_name": "dataset1.jsonl" # If add_filename_column=True (default column name) } ```