Resumable Processing#
This guide explains strategies to make large-scale data operations resumable.
Why Resumable Processing Matters#
Large datasets can trigger interruptions due to:
System timeouts
Hardware failures
Network issues
Resource constraints
Scheduled maintenance
NeMo Curator provides built-in functionality for resuming operations from where they left off.
How it Works#
The resumption approach works by:
Examining filenames in the input directory using
get_all_file_paths_under()
Comparing them with filenames in the output directory
Identifying unprocessed files by comparing file counts or specific file lists
Rerunning the pipeline on remaining files
This approach works best when you:
Use consistent directory structures for input and output
Process files in batches using
files_per_partition
to manage memory usageCreate checkpoints by writing intermediate results to disk
Practical Patterns for Resumable Processing#
1. Process remaining files using directory comparison#
Use file listing utilities to identify unprocessed files and process them directly:
from nemo_curator.utils.file_utils import get_all_file_paths_under
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import JsonlWriter
# Get all input files
input_files = get_all_file_paths_under(
"input_directory/",
recurse_subdirectories=True,
keep_extensions=[".jsonl"]
)
# Get already processed output files
output_files = get_all_file_paths_under(
"output_directory/",
recurse_subdirectories=True,
keep_extensions=[".jsonl"]
)
# Simple approach: if output directory has fewer files than input,
# process all remaining inputs
if len(output_files) < len(input_files):
# Process remaining files
pipeline = Pipeline(name="resumable_processing")
# Read input files
reader = JsonlReader(file_paths=input_files, fields=["text", "id"])
pipeline.add_stage(reader)
# Add your processing stages here
# pipeline.add_stage(your_processing_stage)
# Write results
writer = JsonlWriter(path="output_directory/")
pipeline.add_stage(writer)
# Execute pipeline
pipeline.run()
2. Batch processing with file partitioning#
Control memory usage and enable checkpoint creation by using NeMo Curator’s built-in file partitioning:
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.io.writer import JsonlWriter
# Process files in smaller batches using files_per_partition
pipeline = Pipeline(name="batch_processing")
# JsonlReader automatically handles file partitioning
reader = JsonlReader(
file_paths="input_directory/",
files_per_partition=64, # Process 64 files at a time
fields=["text", "id"]
)
pipeline.add_stage(reader)
# Add your processing stages here
# pipeline.add_stage(your_processing_stage)
# Write results
writer = JsonlWriter(path="output_directory/")
pipeline.add_stage(writer)
# Execute pipeline - processes files in batches automatically
pipeline.run()