nemo_curator.tasks.interleaved

View as Markdown

Interleaved task type and schema for row-wise interleaved multimodal records.

Schema columns fall into two categories:

Reserved columns (RESERVED_COLUMNS) — managed by pipeline stages:

================== ============= =========== =============================================== Column Type Category Description ================== ============= =========== =============================================== sample_id string (req) Identity Unique document/sample identifier position int32 (req) Identity Position within sample (-1 for metadata rows) modality string (req) Identity Row modality — built-in values are text, image, and metadata; extensible to audio, table, generated_image, etc. content_type string Content MIME type (e.g. text/plain, image/jpeg) text_content string Content Text payload for text rows binary_content large_binary Content Image bytes (populated by materialization) source_ref string Internal JSON locator {path, member, byte_offset, byte_size, frame_index}. path alone = direct/remote read;

  • member = tar extract;
  • byte_offset/size = range read (fastest). path accepts local or remote (s3://) URIs. materialize_error string Internal Error message if materialization failed ================== ============= =========== ===============================================

User columns (passthrough) — extra fields from source data added via the fields parameter on the reader. These flow through the pipeline untouched.

Module Contents

Classes

NameDescription
InterleavedBatchTask carrying row-wise multimodal records.

Data

INTERLEAVED_SCHEMA

RESERVED_COLUMNS

API

class nemo_curator.tasks.interleaved.InterleavedBatch(
task_id: str,
dataset_name: str,
data: pyarrow.Table | pandas.DataFrame = (lambda: pa.Table.from_pyli...,
_stage_perf: list[nemo_curator.utils.performance_utils.StagePerfStats] = list(),
_metadata: dict[str, typing.Any] = dict(),
REQUIRED_COLUMNS: frozenset[str] = frozenset(name for name, f ...
)
Dataclass

Bases: Task[Table | DataFrame]

Task carrying row-wise multimodal records.

See module docstring for the full schema reference (reserved vs user columns).

REQUIRED_COLUMNS
frozenset[str]
data
Table | DataFrame
num_items
int

Number of unique samples (distinct sample_id values).

nemo_curator.tasks.interleaved.InterleavedBatch.add_rows(
rows: pyarrow.Table | pandas.DataFrame | list[dict],
sample_id: str | None = None,
auto_position: bool = True
) -> nemo_curator.tasks.interleaved.InterleavedBatch

Add rows to this task.

Parameters:

rows
pa.Table | pd.DataFrame | list[dict]

New rows to append. Must contain required columns unless overridden by sample_id / auto_position.

sample_id
str | NoneDefaults to None

If provided, assign this sample_id to all new rows.

auto_position
boolDefaults to True

If True, auto-assign position values continuing from the existing maximum per sample.

nemo_curator.tasks.interleaved.InterleavedBatch.build_source_ref(
path: str | None,
member: str | None,
byte_offset: int | None = None,
byte_size: int | None = None,
frame_index: int | None = None
) -> str
staticmethod

Build a source_ref JSON locator string.

nemo_curator.tasks.interleaved.InterleavedBatch.count(
modality: str | None = None
) -> int

Return row count, optionally filtered by modality.

Examples::

task.count() # total rows task.count(modality=“image”) # image rows only task.count(modality=“text”) # text rows only

Delete rows where mask is True.

Parameters:

mask
pd.Series

Boolean Series aligned to the data. True marks a row for deletion.

nemo_curator.tasks.interleaved.InterleavedBatch.get_columns() -> list[str]
nemo_curator.tasks.interleaved.InterleavedBatch.parse_source_ref(
source_value: str | None
) -> dict[str, str | int | None]
staticmethod

Parse a source_ref JSON string into a locator dict.

nemo_curator.tasks.interleaved.InterleavedBatch.to_pandas() -> pandas.DataFrame
nemo_curator.tasks.interleaved.InterleavedBatch.to_pyarrow() -> pyarrow.Table
nemo_curator.tasks.interleaved.InterleavedBatch.validate() -> bool
nemo_curator.tasks.interleaved.InterleavedBatch.with_parsed_source_ref_columns(
prefix: str = '_src_'
) -> pandas.DataFrame

Return a DataFrame copy with parsed source_ref columns added.

Columns: {prefix}path, {prefix}member, {prefix}byte_offset, {prefix}byte_size, {prefix}frame_index.

nemo_curator.tasks.interleaved.INTERLEAVED_SCHEMA = pa.schema([pa.field('sample_id', pa.string(), nullable=False), pa.field('positio...
nemo_curator.tasks.interleaved.RESERVED_COLUMNS: frozenset[str] = frozenset(INTERLEAVED_SCHEMA.names)