Curate TextProcess DataInterleaved Datasets

Interleaved IO

View as Markdown

Read and write interleaved image-text data between WebDataset tar shards and Parquet. All four combinations work out of the box:

WDS tar ⇄ InterleavedBatch ⇄ Parquet

Understanding the Round-Trip

Storage Formats

Interleaved samples can live in two on-disk formats; in memory both materialize as InterleavedBatch:

FormatLayoutWhen to Use
WebDataset tar shardsOne tar per shard, one file per item; sample_id encoded in member keyStreaming reads, S3-friendly, MINT-1T-compatible
Parquet rowsOne row per item (text/image/metadata), grouped by sample_idRandom access, column projection, ~5× faster reads in benchmarks

Choose based on the consumer: training loops that stream large datasets often prefer WDS tars; analytics or sample-level inspection prefers Parquet.

When to Convert

A common workflow is “curate once in Parquet, train from tars”:

  1. Curate in Parquet — fast reads, easy filtering, column projection.
  2. Convert to WDS — once curation is done, write the final dataset as MINT-1T-style tars for the training loop.

The IO stages on this page support that workflow without intermediate copies.

Readers

InterleavedParquetReader

Reads Parquet files into InterleavedBatch. Each row maps to one item (text, image, or metadata) with a sample_id that groups items into samples.

1from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader
2
3reader = InterleavedParquetReader(
4 file_paths="s3://bucket/interleaved/*.parquet",
5 fields=("source_url", "license"), # passthrough columns kept alongside reserved ones
6 max_batch_bytes=512 * 1024 * 1024, # 512 MiB per output batch
7)

Key behaviors:

  • fields= passthrough: keep additional columns alongside the reserved interleaved schema. Missing columns null-fill in a single pass.
  • Push-down column projection: the reader uses pq.read_schema() per file and asks Parquet for only the columns it needs.
  • max_batch_bytes splitting: large file groups are split into multiple batches by accumulated size; per-split lineage is preserved on the output source_files field.
  • Schema control: pass schema= for strict alignment to a target Arrow schema, or schema_overrides= for partial type overrides on top of INTERLEAVED_SCHEMA. Specifying both warns and prefers schema=.

InterleavedWebdatasetReader

Reads MINT-1T-style WebDataset tar shards. Same fields= and max_batch_bytes semantics as the Parquet reader.

1from nemo_curator.stages.interleaved.io.reader import InterleavedWebdatasetReader
2
3reader = InterleavedWebdatasetReader(
4 file_paths="s3://bucket/mint1t/*.tar",
5 fields=("source_url",),
6 sample_id_field="sample_id",
7 image_extensions=["png", "jpg", "jpeg"],
8 json_extensions=["json"],
9 texts_field="texts",
10 images_field="images",
11)

Configurable file extensions for image, JSON, and texts members (image_extensions, json_extensions, texts_field, images_field, image_member_field) let the reader cooperate with non-standard tar layouts.

Reader Parameters

ParameterTypeDefaultDescription
file_pathsstr | list[str]RequiredGlob, list, or single path. Supports local, S3, GCS via fsspec.
files_per_partitionint | NoneNoneGroup input files into partitions of this size.
blocksizeint | str | NoneNoneOptional bytes-per-partition target (e.g., "512MiB").
max_batch_bytesint | NoneNoneSplit large partitions into multiple InterleavedBatch outputs; None means no splitting.
fieldstuple[str, …] | NoneNonePassthrough columns to keep alongside the reserved INTERLEAVED_SCHEMA.
read_kwargsdict{}Forwarded to the underlying read (pyarrow.parquet.read_table, etc.).
schemapa.Schema | NoneNoneStrict alignment target. Reserved columns get canonical types; passthrough columns surface overflow.
schema_overridesdict[str, pa.DataType] | NoneNonePartial type overrides on top of INTERLEAVED_SCHEMA. Warns when both schema= and schema_overrides= are set.

Writers

InterleavedParquetWriter

Writes InterleavedBatch to Parquet. Inherits schema= / schema_overrides= and the materialization-error policy (see below).

InterleavedWebdatasetWriterStage

Writes InterleavedBatch to MINT-1T-style WebDataset tar shards.

1from nemo_curator.stages.interleaved.io.writers.webdataset import InterleavedWebdatasetWriterStage
2
3writer = InterleavedWebdatasetWriterStage(
4 output_dir="./out",
5 file_extension="tar",
6)
7pipeline.add_stage(writer)

Implementation notes:

  • urllib.parse.quote(sample_id, safe="") is used as the tar member key — injective and roundtrip-safe with sample_id_field="sample_id" on the matching reader.
  • Single-pass groupby: rows are grouped by sample_id once instead of filtered per sample (O(n) instead of O(n × m)).
  • Sparse positions: gaps in the position field are preserved as None entries; the WDS reader skips them on the way back.
  • Supported modalities: metadata, text, and image. Any other modality raises ValueError at write time.

Schema Utilities

utils/schema.py ships shared helpers used by every arrow-based reader and writer.

reconcile_schema()

Apply canonical types to reserved columns; preserve passthrough column types as-is; avoid unsafe large↔small downcasts; unwrap Parquet dictionary encoding from passthrough columns.

align_table()

Pad, reorder, and cast an Arrow table to a target schema. Reserved columns use safe=False for predictable casts; passthrough columns use safe=True so overflow surfaces rather than silently corrupting data. Does not re-reconcile the user-provided target.

resolve_schema()

Merge schema= and schema_overrides= arguments with the canonical INTERLEAVED_SCHEMA; warn when both are provided; return None when neither is set. Used internally by readers and writers.

Materialization Error Policy

Writers expose on_materialize_error=, controlling what happens when a fetch step (or upstream stage) sets an error on a row:

ValueBehaviorUse Case
"error"Raise immediately (default for strict pipelines).Production pipelines where an error indicates a real problem.
"warn"Emit a warning and keep the row.Development; want visibility without halting.
"drop_row"Drop the offending row but keep the rest of the sample.Resilient pipelines where one bad item shouldn’t kill a sample.
"drop_sample"Drop the entire sample if any row in it errored.Strict cleanliness; one bad row taints the whole sample.

The policy is applied after fetch, so it covers errors raised by the materialization step itself and by upstream stages.

Mixed-Backend Path Handling

The internal helper _build_global_range_index() groups paths by filesystem object so a single batch that mixes S3 and local paths no longer fails silently — each backend’s range queries run against the right filesystem. This works automatically; you don’t need to configure it.

Performance

Benchmarks on 80 NVMe shards of MINT-1T PDF data (6,818 samples, 9.9 GB WDS / 6.0 GB Parquet) with an aspect-ratio filter applied:

PathWall-clockSamples/secNotes
WDS → Parquet76.8 s88.8Tar parsing dominates
WDS → WDS75.4 s90.4Tar parsing dominates
Parquet → Parquet15.7 s435.0Parquet column projection wins
Parquet → WDS18.5 s368.4Parquet read + tar write

Parquet-sourced paths are roughly 5× faster than WDS-sourced paths because tar parsing dominates the WDS read cost.

Complete IO Pipeline Examples

Curate-Once-in-Parquet, Train-from-Tars

1from nemo_curator.pipeline import Pipeline
2from nemo_curator.backends.xenna import XennaExecutor
3from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader
4from nemo_curator.stages.interleaved.io.writers.webdataset import (
5 InterleavedWebdatasetWriterStage,
6)
7from nemo_curator.stages.interleaved.filter.blur_filter import InterleavedBlurFilterStage
8
9pipeline = Pipeline(name="parquet_to_wds")
10
11# 1. Read curated Parquet (faster random access during filtering)
12pipeline.add_stage(
13 InterleavedParquetReader(file_paths="s3://bucket/curated/*.parquet")
14)
15
16# 2. Filter
17pipeline.add_stage(InterleavedBlurFilterStage(score_threshold=100.0))
18
19# 3. Write final WDS tars (training-loop friendly)
20pipeline.add_stage(InterleavedWebdatasetWriterStage(output_dir="./final_tars"))
21
22executor = XennaExecutor()
23pipeline.run(executor)

Format Conversion (No Filtering)

1# WDS → Parquet for analytics / ad-hoc inspection
2pipeline.add_stage(
3 InterleavedWebdatasetReader(file_paths="s3://bucket/mint1t/*.tar")
4)
5pipeline.add_stage(InterleavedParquetWriter(output_dir="./parquet_copy"))

Best Practices

  • Curate in Parquet, ship in WDS: Parquet is ~5× faster for filtering operations; convert to WDS only for the final training-loop format.
  • Pass through provenance fields: list source-tracking columns (source_url, license, crawl_date) in fields= so they survive into the output. Missing the list silently drops them.
  • Use max_batch_bytes for large shards: without splitting, a single 5 GB Parquet file becomes one giant batch. Set max_batch_bytes=512 * 1024 * 1024 for memory-friendly batches.
  • Pick the right on_materialize_error: "error" for production, "warn" for development, "drop_sample" for strict cleanliness. The default raises — set it explicitly when you want anything else.