Curate TextLoad Data

Custom Data Loading

View as Markdown

Create custom data loading pipelines using Curator. This guide shows how to build modular stages that run on Curator’s distributed processing.

How It Works

Curator uses the same 4-step pipeline pattern described in Data Acquisition Concepts for custom data loading. Each step uses an abstract base class with corresponding processing stages that compose into pipelines.


Architecture Overview

For detailed information about the core components and data flow, see Data Acquisition Concepts and Data Loading Concepts .


Implementation Guide

1. Create Directory Structure

your_data_source/
├── __init__.py
├── stage.py # Main composite stage
├── url_generation.py # URL generation logic
├── download.py # Download implementation
├── iterator.py # File iteration logic
└── extract.py # Data extraction logic (optional)

2. Build Core Components

URL Generator (url_generation.py)

1from dataclasses import dataclass
2from nemo_curator.stages.text.download import URLGenerator
3
4@dataclass
5class CustomURLGenerator(URLGenerator):
6 def generate_urls(self) -> list[str]:
7 """Generate list of URLs to download."""
8 # Your URL generation logic here
9 return [
10 "https://example.com/dataset1.zip",
11 "https://example.com/dataset2.zip",
12 ]

Document Downloader (download.py)

1from nemo_curator.stages.text.download import DocumentDownloader
2
3class CustomDownloader(DocumentDownloader):
4 def __init__(self, download_dir: str):
5 super().__init__(download_dir=download_dir)
6
7 def _get_output_filename(self, url: str) -> str:
8 """Extract filename from URL."""
9 return url.split("/")[-1]
10
11 def _download_to_path(self, url: str, path: str) -> tuple[bool, str | None]:
12 """Download file from URL to local path."""
13 # Custom download logic
14 # Return (success_bool, error_message)
15 try:
16 # ... download implementation ...
17 return True, None
18 except Exception as e:
19 return False, str(e)

Document Iterator (iterator.py)

1import json
2from collections.abc import Iterator
3from typing import Any
4from nemo_curator.stages.text.download import DocumentIterator
5
6class CustomIterator(DocumentIterator):
7 def __init__(self, log_frequency: int = 1000):
8 super().__init__()
9 self._log_frequency = log_frequency
10
11 def iterate(self, file_path: str) -> Iterator[dict[str, Any]]:
12 """Iterate over records in a file."""
13 # Custom iteration logic to load local file and return documents
14 for record in load_local_file_fn(file_path):
15 yield {"content": record_content, "metadata": record_metadata, "id": record_id}
16
17 def output_columns(self) -> list[str]:
18 """Define output columns."""
19 return ["content", "metadata", "id"]

Document Extractor (extract.py)

1from typing import Any
2from nemo_curator.stages.text.download import DocumentExtractor
3
4class CustomExtractor(DocumentExtractor):
5 def __init__(self):
6 super().__init__()
7
8 def extract(self, record: dict[str, str]) -> dict[str, Any] | None:
9 """Transform raw record to final format."""
10 # Skip invalid records
11 if not record.get("content"):
12 return None
13
14 # Extract and clean text
15 cleaned_text = self._clean_text(record["content"])
16
17 # Generate unique ID if not present
18 doc_id = record.get("id", self._generate_id(cleaned_text))
19
20 return {
21 "text": cleaned_text,
22 "id": doc_id,
23 "source": record.get("metadata", {}).get("source", "unknown")
24 }
25
26 def input_columns(self) -> list[str]:
27 return ["content", "metadata", "id"]
28
29 def output_columns(self) -> list[str]:
30 return ["text", "id", "source"]
31
32 def _clean_text(self, text: str) -> str:
33 """Clean and normalize text."""
34 # Your text cleaning logic here
35 return text.strip()
36
37 def _generate_id(self, text: str) -> str:
38 """Generate unique ID for text."""
39 import hashlib
40 return hashlib.md5(text.encode()).hexdigest()[:16]

3. Create Composite Stage (stage.py)

1from nemo_curator.stages.text.download import DocumentDownloadExtractStage
2from nemo_curator.stages.base import ProcessingStage
3from .url_generation import CustomURLGenerator
4from .download import CustomDownloader
5from .iterator import CustomIterator
6from .extract import CustomExtractor
7
8class CustomDataStage(DocumentDownloadExtractStage):
9 """Custom data loading stage combining all components."""
10
11 def __init__(
12 self,
13 download_dir: str = "./custom_downloads",
14 url_limit: int | None = None,
15 record_limit: int | None = None,
16 add_filename_column: bool | str = True,
17 ):
18 self.url_generator = CustomURLGenerator()
19 self.downloader = CustomDownloader(download_dir=download_dir)
20 self.iterator = CustomIterator()
21 self.extractor = CustomExtractor()
22
23 # Initialize the parent composite stage
24 super().__init__(
25 url_generator=self.url_generator,
26 downloader=self.downloader,
27 iterator=self.iterator,
28 extractor=self.extractor, # Optional - remove if not needed
29 url_limit=url_limit,
30 record_limit=record_limit,
31 add_filename_column=add_filename_column,
32 )
33 self.name = "custom_data"
34
35 def decompose(self) -> list[ProcessingStage]:
36 """Decompose this composite stage into its constituent stages."""
37 return self.stages
38
39 def get_description(self) -> str:
40 """Get a description of this composite stage."""
41 return "Custom data"

Usage Examples

Basic Pipeline

1from nemo_curator.core.client import RayClient
2from nemo_curator.pipeline import Pipeline
3from your_data_source.stage import CustomDataStage
4from nemo_curator.stages.text.io.writer.jsonl import JsonlWriter
5
6def main():
7 # Initialize Ray client
8 ray_client = RayClient()
9 ray_client.start()
10
11 # Create pipeline
12 pipeline = Pipeline(
13 name="custom_data_pipeline",
14 description="Load and process custom dataset"
15 )
16
17 # Create custom data loading stage
18 data_stage = CustomDataStage(...)
19
20 pipeline.add_stage(data_stage)
21
22 # Save the results to JSONL
23 pipeline.add_stage(JsonlWriter(...))
24
25 # Run pipeline
26 print("Starting pipeline...")
27 results = pipeline.run()
28
29 # Stop Ray client
30 ray_client.stop()
31
32if __name__ == "__main__":
33 main()

For executor options and configuration, refer to Execution Backends.


Parameters Reference

ParameterTypeDescriptionDefault
url_generatorURLGeneratorCustom URL generation implementationRequired
downloaderDocumentDownloaderCustom download implementationRequired
iteratorDocumentIteratorCustom file iteration implementationRequired
extractorDocumentExtractor | NoneOptional extraction/transformation stepNone
url_limitint | NoneMaximum number of URLs to processNone
record_limitint | NoneMaximum records per fileNone
add_filename_columnbool | strAdd filename column to output; if str, uses it as the column name (default name: “file_name”)True

Output Format

Processed data flows through the pipeline as DocumentBatch tasks containing Pandas DataFrames or PyArrow Tables:

Example Output Schema

1{
2 "text": "This is the processed document text",
3 "id": "unique-document-id",
4 "source": "example.com",
5 "file_name": "dataset1.jsonl" # If add_filename_column=True (default column name)
6}