Read Existing Data#
Use Curator’s JsonlReader
and ParquetReader
to read existing datasets into a pipeline, then optionally add processing stages.
Example: Read JSONL and Filter
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import JsonlReader
from nemo_curator.stages.text.modules import ScoreFilter
from nemo_curator.stages.text.filters import WordCountFilter
# Create pipeline for processing existing JSONL files
pipeline = Pipeline(name="jsonl_data_processing")
# Read JSONL files
reader = JsonlReader(
file_paths="/path/to/data/*.jsonl",
files_per_partition=4,
fields=["text", "url"] # Only read specific columns
)
pipeline.add_stage(reader)
# Add filtering stage
word_filter = ScoreFilter(
filter_obj=WordCountFilter(min_words=50, max_words=1000),
text_field="text"
)
pipeline.add_stage(word_filter)
# Execute pipeline
results = pipeline.run()
Example: Read Parquet and Filter
from nemo_curator.pipeline import Pipeline
from nemo_curator.stages.text.io.reader import ParquetReader
from nemo_curator.stages.text.modules import ScoreFilter
from nemo_curator.stages.text.filters import WordCountFilter
# Create pipeline for processing existing Parquet files
pipeline = Pipeline(name="parquet_data_processing")
# Read Parquet files with PyArrow engine
reader = ParquetReader(
file_paths="/path/to/data",
files_per_partition=4,
fields=["text", "metadata"] # Only read specific columns
)
pipeline.add_stage(reader)
# Add filtering stage
word_filter = ScoreFilter(
filter_obj=WordCountFilter(min_words=50, max_words=1000),
text_field="text"
)
pipeline.add_stage(word_filter)
# Execute pipeline
results = pipeline.run()
Reader Configuration#
Common Parameters#
Both JsonlReader
and ParquetReader
support these configuration options:
Parameter |
Type |
Description |
Default |
---|---|---|---|
|
str | list[str] |
File paths or glob patterns to read |
Required |
|
int | None |
Number of files per partition |
None |
|
int | str | None |
Target partition size (e.g., “128MB”) |
None |
|
list[str] | None |
Column names to read (column selection) |
None (all columns) |
|
dict[str, Any] | None |
Extra arguments for the underlying reader |
None |
Parquet-Specific Features#
ParquetReader
provides these optimizations:
PyArrow Engine: Uses
pyarrow
engine by default for better performanceStorage Options: Supports cloud storage via
storage_options
inread_kwargs
Schema Handling: Automatic schema inference and validation
Columnar Efficiency: Optimized for reading specific columns
Performance Tips#
Use
fields
parameter to read required columns for better performanceSet
files_per_partition
based on your cluster size and memory constraintsFor cloud storage, configure
storage_options
inread_kwargs
Use
blocksize
for fine-grained control over partition sizes
Output Integration#
Both readers produce DocumentBatch
tasks that integrate seamlessly with:
Processing Stages: Apply filters, transformations, and quality checks
Writer Stages: Export to JSONL, Parquet, or other formats
Analysis Tools: Convert to Pandas/PyArrow for inspection and debugging