Custom Data Loading#
Load and process your own custom datasets using NeMo Curator’s extensible framework. This guide explains how to implement custom data loaders that integrate with NeMo Curator’s distributed processing capabilities.
How it Works#
NeMo Curator’s custom data loading process:
Downloads data from your source using a custom
DocumentDownloader
Iterates through the downloaded data using a custom
DocumentIterator
Extracts text using a custom
DocumentExtractor
Outputs the processed data in JSONL or Parquet format
Usage#
Here’s how to implement and use custom data loaders:
from nemo_curator import get_client
from nemo_curator.download import download_and_extract
from my_custom_module import MyCustomDownloader, MyCustomIterator, MyCustomExtractor
def main():
# Initialize a Dask client
client = get_client(cluster_type="cpu")
# Create instances of your custom components
downloader = MyCustomDownloader()
iterator = MyCustomIterator()
extractor = MyCustomExtractor()
# Use them with NeMo Curator's framework
dataset = download_and_extract(
urls=[url1, url2, url3],
output_paths=[output_path1, output_path2, output_path3],
downloader=downloader,
iterator=iterator,
extractor=extractor,
output_format={"text": str, "id": str},
output_type="jsonl",
keep_raw_download=False,
force_download=False,
filename_col="file_name",
record_limit=None
)
# Process the dataset
dataset.to_json(output_path="/output/folder", write_to_filename=True)
if __name__ == "__main__":
main()
Create a configuration YAML file:
# custom_config.yaml
download_module: my_custom_module.MyCustomDownloader
download_params:
param1: value1
param2: value2
iterator_module: my_custom_module.MyCustomIterator
iterator_params:
param3: value3
extract_module: my_custom_module.MyCustomExtractor
extract_params:
param4: value4
Then run the command-line tool:
# Note: Use the actual script name from nemo_curator/scripts/
python -m nemo_curator.scripts.download_and_extract \
--input-url-file=./my_urls.txt \
--builder-config-file=./custom_config.yaml \
--output-json-dir=/output/folder
Parameters#
Parameter |
Type |
Description |
Default |
---|---|---|---|
|
List[str] |
List of URLs or paths to download from |
Required |
|
List[str] |
List of paths where downloaded files will be stored |
Required |
|
DocumentDownloader |
Custom downloader implementation |
Required |
|
DocumentIterator |
Custom iterator implementation |
Required |
|
DocumentExtractor |
Custom extractor implementation |
Required |
|
Dict[str, type] |
Schema for output data |
Required |
|
Literal[“jsonl”, “parquet”] |
Output file format |
“jsonl” |
|
bool |
Whether to retain raw downloaded files after extraction |
False |
|
bool |
Whether to re-download and re-extract existing files |
False |
|
str |
Name of the column for storing filenames in the dataset |
“file_name” |
|
int | None |
Maximum number of records to extract from each file |
None |
Output Format#
The processed data can be stored in either JSONL or Parquet format:
JSONL Format#
{
"text": "This is a sample text document",
"id": "unique-id-123",
"metadata": {
"source": "example",
"timestamp": "2024-03-21"
}
}
Parquet Format#
Parquet files maintain the same schema as JSONL files but provide:
Efficient compression
Fast query performance
Column-based operations
Reduced storage costs
Implementation Guide#
1. Create Custom Downloader#
from nemo_curator.download.doc_builder import DocumentDownloader
class MyCustomDownloader(DocumentDownloader):
def download(self, url):
"""Download data from url and return the path to the downloaded file"""
# Implement download logic
return "/path/to/downloaded/file"
2. Create Custom Iterator#
from nemo_curator.download.doc_builder import DocumentIterator
class MyCustomIterator(DocumentIterator):
def iterate(self, file_path):
"""Iterate through documents in the downloaded file"""
for doc in my_iterator_logic(file_path):
metadata = {"url": doc.get("url", "")}
content = doc.get("content", "")
yield metadata, content
3. Create Custom Extractor#
from nemo_curator.download.doc_builder import DocumentExtractor
class MyCustomExtractor(DocumentExtractor):
def extract(self, content):
"""Extract text from content and return a dictionary"""
# Your extraction logic here
extracted_text = process_content(content)
unique_id = generate_unique_id(content)
return {
'text': extracted_text,
'id': unique_id,
# Add any other fields as needed
}
Enhancing Custom Extraction
When implementing custom extractors, consider adding robust error handling and metadata extraction to improve the quality of your processed data. You can also implement content filtering and validation logic within your extractor.
Best Practices#
Error Handling: Implement robust error handling for corrupt files and network issues
Logging: Use Python’s logging module for process visibility and debugging
Metadata: Include useful metadata in extracted documents for downstream processing
Chunking: Consider chunking large files for efficient distributed processing
Caching: Implement caching to avoid re-downloading or re-processing data
Parameter Validation: Validate input parameters in your custom classes
Memory Management: Be mindful of memory usage when processing large files
Type Annotations: Use proper type hints to improve code clarity and IDE support