> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.nvidia.com/nemo/curator/llms.txt.
> For full documentation content, see https://docs.nvidia.com/nemo/curator/llms-full.txt.

> Read and write interleaved image-text datasets between WebDataset tar shards and Parquet using InterleavedParquetReader, InterleavedWebdatasetReader, and matching writers

# Interleaved IO

Read and write interleaved image-text data between WebDataset tar shards and Parquet. All four combinations work out of the box:

```text
WDS tar  ⇄  InterleavedBatch  ⇄  Parquet
```

## Understanding the Round-Trip

### Storage Formats

Interleaved samples can live in two on-disk formats; in memory both materialize as `InterleavedBatch`:

| Format                    | Layout                                                                  | When to Use                                                       |
| ------------------------- | ----------------------------------------------------------------------- | ----------------------------------------------------------------- |
| **WebDataset tar shards** | One tar per shard, one file per item; `sample_id` encoded in member key | Streaming reads, S3-friendly, MINT-1T-compatible                  |
| **Parquet rows**          | One row per item (text/image/metadata), grouped by `sample_id`          | Random access, column projection, \~5× faster reads in benchmarks |

Choose based on the consumer: training loops that stream large datasets often prefer WDS tars; analytics or sample-level inspection prefers Parquet.

### When to Convert

A common workflow is "curate once in Parquet, train from tars":

1. **Curate in Parquet** — fast reads, easy filtering, column projection.
2. **Convert to WDS** — once curation is done, write the final dataset as MINT-1T-style tars for the training loop.

The IO stages on this page support that workflow without intermediate copies.

## Readers

### `InterleavedParquetReader`

Reads Parquet files into `InterleavedBatch`. Each row maps to one item (text, image, or metadata) with a `sample_id` that groups items into samples.

```python
from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader

reader = InterleavedParquetReader(
    file_paths="s3://bucket/interleaved/*.parquet",
    fields=("source_url", "license"),  # passthrough columns kept alongside reserved ones
    max_batch_bytes=512 * 1024 * 1024, # 512 MiB per output batch
)
```

Key behaviors:

* **`fields=` passthrough**: keep additional columns alongside the reserved interleaved schema. Missing columns null-fill in a single pass.
* **Push-down column projection**: the reader uses `pq.read_schema()` per file and asks Parquet for only the columns it needs.
* **`max_batch_bytes` splitting**: large file groups are split into multiple batches by accumulated size; per-split lineage is preserved on the output `source_files` field.
* **Schema control**: pass `schema=` for strict alignment to a target Arrow schema, or `schema_overrides=` for partial type overrides on top of `INTERLEAVED_SCHEMA`. Specifying both warns and prefers `schema=`.

### `InterleavedWebdatasetReader`

Reads MINT-1T-style WebDataset tar shards. Same `fields=` and `max_batch_bytes` semantics as the Parquet reader.

```python
from nemo_curator.stages.interleaved.io.reader import InterleavedWebdatasetReader

reader = InterleavedWebdatasetReader(
    file_paths="s3://bucket/mint1t/*.tar",
    fields=("source_url",),
    sample_id_field="sample_id",
    image_extensions=["png", "jpg", "jpeg"],
    json_extensions=["json"],
    texts_field="texts",
    images_field="images",
)
```

Configurable file extensions for image, JSON, and texts members (`image_extensions`, `json_extensions`, `texts_field`, `images_field`, `image_member_field`) let the reader cooperate with non-standard tar layouts.

### Reader Parameters

| Parameter             | Type                            | Default  | Description                                                                                                       |
| --------------------- | ------------------------------- | -------- | ----------------------------------------------------------------------------------------------------------------- |
| `file_paths`          | str \| list\[str]               | Required | Glob, list, or single path. Supports local, S3, GCS via fsspec.                                                   |
| `files_per_partition` | int \| None                     | `None`   | Group input files into partitions of this size.                                                                   |
| `blocksize`           | int \| str \| None              | `None`   | Optional bytes-per-partition target (e.g., `"512MiB"`).                                                           |
| `max_batch_bytes`     | int \| None                     | `None`   | Split large partitions into multiple `InterleavedBatch` outputs; `None` means no splitting.                       |
| `fields`              | tuple\[str, ...] \| None        | `None`   | Passthrough columns to keep alongside the reserved `INTERLEAVED_SCHEMA`.                                          |
| `read_kwargs`         | dict                            | `{}`     | Forwarded to the underlying read (`pyarrow.parquet.read_table`, etc.).                                            |
| `schema`              | pa.Schema \| None               | `None`   | Strict alignment target. Reserved columns get canonical types; passthrough columns surface overflow.              |
| `schema_overrides`    | dict\[str, pa.DataType] \| None | `None`   | Partial type overrides on top of `INTERLEAVED_SCHEMA`. Warns when both `schema=` and `schema_overrides=` are set. |

## Writers

### `InterleavedParquetWriter`

Writes `InterleavedBatch` to Parquet. Inherits `schema=` / `schema_overrides=` and the materialization-error policy (see below).

### `InterleavedWebdatasetWriterStage`

Writes `InterleavedBatch` to MINT-1T-style WebDataset tar shards.

```python
from nemo_curator.stages.interleaved.io.writers.webdataset import InterleavedWebdatasetWriterStage

writer = InterleavedWebdatasetWriterStage(
    output_dir="./out",
    file_extension="tar",
)
pipeline.add_stage(writer)
```

Implementation notes:

* **`urllib.parse.quote(sample_id, safe="")`** is used as the tar member key — injective and roundtrip-safe with `sample_id_field="sample_id"` on the matching reader.
* **Single-pass `groupby`**: rows are grouped by `sample_id` once instead of filtered per sample (O(n) instead of O(n × m)).
* **Sparse positions**: gaps in the position field are preserved as `None` entries; the WDS reader skips them on the way back.
* **Supported modalities**: `metadata`, `text`, and `image`. Any other modality raises `ValueError` at write time.

## Schema Utilities

`utils/schema.py` ships shared helpers used by every arrow-based reader and writer.

### `reconcile_schema()`

Apply canonical types to reserved columns; preserve passthrough column types as-is; avoid unsafe large↔small downcasts; unwrap Parquet dictionary encoding from passthrough columns.

### `align_table()`

Pad, reorder, and cast an Arrow table to a target schema. Reserved columns use `safe=False` for predictable casts; passthrough columns use `safe=True` so overflow surfaces rather than silently corrupting data. Does **not** re-reconcile the user-provided target.

### `resolve_schema()`

Merge `schema=` and `schema_overrides=` arguments with the canonical `INTERLEAVED_SCHEMA`; warn when both are provided; return `None` when neither is set. Used internally by readers and writers.

## Materialization Error Policy

Writers expose `on_materialize_error=`, controlling what happens when a fetch step (or upstream stage) sets an error on a row:

| Value           | Behavior                                                | Use Case                                                        |
| --------------- | ------------------------------------------------------- | --------------------------------------------------------------- |
| `"error"`       | Raise immediately (default for strict pipelines).       | Production pipelines where an error indicates a real problem.   |
| `"warn"`        | Emit a warning and keep the row.                        | Development; want visibility without halting.                   |
| `"drop_row"`    | Drop the offending row but keep the rest of the sample. | Resilient pipelines where one bad item shouldn't kill a sample. |
| `"drop_sample"` | Drop the entire sample if any row in it errored.        | Strict cleanliness; one bad row taints the whole sample.        |

The policy is applied after fetch, so it covers errors raised by the materialization step itself and by upstream stages.

## Mixed-Backend Path Handling

The internal helper `_build_global_range_index()` groups paths by filesystem object so a single batch that mixes S3 and local paths no longer fails silently — each backend's range queries run against the right filesystem. This works automatically; you don't need to configure it.

## Performance

Benchmarks on 80 NVMe shards of MINT-1T PDF data (6,818 samples, 9.9 GB WDS / 6.0 GB Parquet) with an aspect-ratio filter applied:

| Path                  | Wall-clock | Samples/sec | Notes                          |
| --------------------- | ---------- | ----------- | ------------------------------ |
| WDS → Parquet         | 76.8 s     | 88.8        | Tar parsing dominates          |
| WDS → WDS             | 75.4 s     | 90.4        | Tar parsing dominates          |
| **Parquet → Parquet** | **15.7 s** | **435.0**   | Parquet column projection wins |
| **Parquet → WDS**     | **18.5 s** | **368.4**   | Parquet read + tar write       |

Parquet-sourced paths are roughly **5× faster** than WDS-sourced paths because tar parsing dominates the WDS read cost.

## Complete IO Pipeline Examples

### Curate-Once-in-Parquet, Train-from-Tars

```python
from nemo_curator.pipeline import Pipeline
from nemo_curator.backends.xenna import XennaExecutor
from nemo_curator.stages.interleaved.io.reader import InterleavedParquetReader
from nemo_curator.stages.interleaved.io.writers.webdataset import (
    InterleavedWebdatasetWriterStage,
)
from nemo_curator.stages.interleaved.filter.blur_filter import InterleavedBlurFilterStage

pipeline = Pipeline(name="parquet_to_wds")

# 1. Read curated Parquet (faster random access during filtering)
pipeline.add_stage(
    InterleavedParquetReader(file_paths="s3://bucket/curated/*.parquet")
)

# 2. Filter
pipeline.add_stage(InterleavedBlurFilterStage(score_threshold=100.0))

# 3. Write final WDS tars (training-loop friendly)
pipeline.add_stage(InterleavedWebdatasetWriterStage(output_dir="./final_tars"))

executor = XennaExecutor()
pipeline.run(executor)
```

### Format Conversion (No Filtering)

```python
# WDS → Parquet for analytics / ad-hoc inspection
pipeline.add_stage(
    InterleavedWebdatasetReader(file_paths="s3://bucket/mint1t/*.tar")
)
pipeline.add_stage(InterleavedParquetWriter(output_dir="./parquet_copy"))
```

## Best Practices

* **Curate in Parquet, ship in WDS**: Parquet is \~5× faster for filtering operations; convert to WDS only for the final training-loop format.
* **Pass through provenance fields**: list source-tracking columns (`source_url`, `license`, `crawl_date`) in `fields=` so they survive into the output. Missing the list silently drops them.
* **Use `max_batch_bytes` for large shards**: without splitting, a single 5 GB Parquet file becomes one giant batch. Set `max_batch_bytes=512 * 1024 * 1024` for memory-friendly batches.
* **Pick the right `on_materialize_error`**: `"error"` for production, `"warn"` for development, `"drop_sample"` for strict cleanliness. The default raises — set it explicitly when you want anything else.

## Related Topics

* **[Interleaved Filters](/curate-text/process-data/interleaved/filters)** — sample-level quality filters that operate on `InterleavedBatch`.
* **[Nemotron-Parse PDF Pipeline](/curate-text/load-data/nemotron-parse-pdf)** — produces interleaved Parquet output from PDF inputs.