ReferenceInfra

Resumable Processing

View as Markdown

This guide explains strategies to make large-scale data operations resumable.

Why Resumable Processing Matters

Large datasets can trigger interruptions due to:

  • System timeouts
  • Hardware failures
  • Network issues
  • Resource constraints
  • Scheduled maintenance

NeMo Curator provides built-in functionality for resuming operations from where they left off.

How it Works

The resumption approach works by:

  1. Examining filenames in the input directory using get_all_file_paths_under()
  2. Comparing them with filenames in the output directory
  3. Identifying unprocessed files by comparing file counts or specific file lists
  4. Rerunning the pipeline on remaining files

This approach works best when you:

  • Use consistent directory structures for input and output
  • Process files in batches using files_per_partition to manage memory usage
  • Create checkpoints by writing intermediate results to disk

Practical Patterns for Resumable Processing

1. Process remaining files using directory comparison

Use file listing utilities to identify unprocessed files and process them directly:

1from nemo_curator.utils.file_utils import get_all_file_paths_under
2from nemo_curator.pipeline import Pipeline
3from nemo_curator.stages.text.io.reader import JsonlReader
4from nemo_curator.stages.text.io.writer import JsonlWriter
5
6# Get all input files
7input_files = get_all_file_paths_under(
8 "input_directory/",
9 recurse_subdirectories=True,
10 keep_extensions=[".jsonl"]
11)
12
13# Get already processed output files
14output_files = get_all_file_paths_under(
15 "output_directory/",
16 recurse_subdirectories=True,
17 keep_extensions=[".jsonl"]
18)
19
20# Simple approach: if output directory has fewer files than input,
21# process all remaining inputs
22if len(output_files) < len(input_files):
23 # Process remaining files
24 pipeline = Pipeline(name="resumable_processing")
25
26 # Read input files
27 reader = JsonlReader(file_paths=input_files, fields=["text", "id"])
28 pipeline.add_stage(reader)
29
30 # Add your processing stages here
31 # pipeline.add_stage(your_processing_stage)
32
33 # Write results
34 writer = JsonlWriter(path="output_directory/")
35 pipeline.add_stage(writer)
36
37 # Execute pipeline
38 pipeline.run()

2. Batch processing with file partitioning

Control memory usage and enable checkpoint creation by using NeMo Curator’s built-in file partitioning:

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.stages.text.io.reader import JsonlReader
3from nemo_curator.stages.text.io.writer import JsonlWriter
4
5# Process files in smaller batches using files_per_partition
6pipeline = Pipeline(name="batch_processing")
7
8# JsonlReader automatically handles file partitioning
9reader = JsonlReader(
10 file_paths="input_directory/",
11 files_per_partition=64, # Process 64 files at a time
12 fields=["text", "id"]
13)
14pipeline.add_stage(reader)
15
16# Add your processing stages here
17# pipeline.add_stage(your_processing_stage)
18
19# Write results
20writer = JsonlWriter(path="output_directory/")
21pipeline.add_stage(writer)
22
23# Execute pipeline - processes files in batches automatically
24pipeline.run()