nemo_curator.stages.interleaved.io.readers.webdataset

View as Markdown

Module Contents

Classes

NameDescription
WebdatasetReaderStageRead MINT1T-style WebDataset shards into a row-wise multimodal task.
_ReadContextPer-tar state shared across all members in a single tar archive.
_SampleContextPer-sample state passed to row builder methods.

API

class nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage(
read_kwargs: dict[str, typing.Any] = dict(),
name: str = 'webdataset_reader',
materialize_on_read: bool = False,
max_batch_bytes: int | None = None,
json_extensions: tuple[str, ...] = DEFAULT_JSON_EXTENSIONS,
image_extensions: tuple[str, ...] = (lambda: DEFAULT_IMAGE_EXTE...,
source_id_field: str = '',
sample_id_field: str | None = None,
texts_field: str = 'texts',
images_field: str = 'images',
image_member_field: str | None = None,
fields: tuple[str, ...] | None = None,
per_image_fields: tuple[str, ...] = (),
per_text_fields: tuple[str, ...] = ()
)
Dataclass

Bases: BaseInterleavedReader

Read MINT1T-style WebDataset shards into a row-wise multimodal task.

fields
tuple[str, ...] | None = None
image_extensions
tuple[str, ...]
image_member_field
str | None = None
images_field
str = 'images'
json_extensions
tuple[str, ...] = DEFAULT_JSON_EXTENSIONS
materialize_on_read
bool = False
max_batch_bytes
int | None = None
name
str = 'webdataset_reader'
per_image_fields
tuple[str, ...] = ()
per_text_fields
tuple[str, ...] = ()
sample_id_field
str | None = None
source_id_field
str = ''
texts_field
str = 'texts'
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage.__post_init__() -> None
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._apply_per_modality_fields(
row: dict[str, typing.Any],
passthrough: dict[str, list[typing.Any]],
index: int
) -> None
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._build_passthrough_row(
sample: dict[str, typing.Any]
) -> dict[str, typing.Any]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._build_row(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext,
row_fields: dict[str, typing.Any]
) -> dict[str, typing.Any]
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._build_source_ref(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext,
content_key: str | None,
frame_index: int | None = None
) -> str
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._empty_output_schema() -> pyarrow.Schema
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._extract_per_modality_fields(
sample: dict[str, typing.Any],
field_names: tuple[str, ...]
) -> dict[str, list[typing.Any]]
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._extract_tar_member(
tf: tarfile.TarFile,
member_name: str,
cache: dict[str, bytes | None]
) -> bytes | None
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._image_rows(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext
) -> list[dict[str, typing.Any]]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._metadata_row(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext
) -> dict[str, typing.Any]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._reconcile_schema(
inferred: pyarrow.Schema
) -> pyarrow.Schema
staticmethod

Build a schema with canonical types for reserved columns and inferred types for passthrough.

nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._resolve_default_image_member_name(
sample_id: str,
sample: dict[str, typing.Any],
images: list[object] | None,
member_names: set[str]
) -> str | None
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._resolve_image_content_key(
image_token: object,
default_image_member_name: str | None,
member_names: set[str]
) -> str | None
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._rows_from_member(
tf: tarfile.TarFile,
member: tarfile.TarInfo,
read_ctx: nemo_curator.stages.interleaved.io.readers.webdataset._ReadContext
) -> list[dict[str, typing.Any]]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._rows_from_sample(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext
) -> list[dict[str, typing.Any]]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._text_rows(
ctx: nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext
) -> list[dict[str, typing.Any]]
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage._warn_per_modality_length_mismatch(
sample_id: str,
passthrough: dict[str, list[typing.Any]],
actual_count: int,
modality: str
) -> None
staticmethod
nemo_curator.stages.interleaved.io.readers.webdataset.WebdatasetReaderStage.process(
task: nemo_curator.tasks.FileGroupTask
) -> nemo_curator.tasks.InterleavedBatch | list[nemo_curator.tasks.InterleavedBatch]) -> nemo_curator.tasks.InterleavedBatch | list[nemo_curator.tasks.InterleavedBatch]
class nemo_curator.stages.interleaved.io.readers.webdataset._ReadContext(
tar_path: str,
member_names: set[str],
member_info: dict[str, tarfile.TarInfo],
storage_options: dict[str, object],
byte_cache: dict[str, bytes | None]
)
Dataclass

Per-tar state shared across all members in a single tar archive.

byte_cache
dict[str, bytes | None]
member_info
dict[str, TarInfo]
member_names
set[str]
storage_options
dict[str, object]
tar_path
str
class nemo_curator.stages.interleaved.io.readers.webdataset._SampleContext(
sample_id: str,
sample: dict[str, typing.Any],
tar_path: str,
json_member_name: str,
member_names: set[str],
member_info: dict[str, tarfile.TarInfo] | None,
passthrough: dict[str, typing.Any],
per_image_passthrough: dict[str, list[typing.Any]],
per_text_passthrough: dict[str, list[typing.Any]]
)
Dataclass

Per-sample state passed to row builder methods.

json_member_name
str
member_info
dict[str, TarInfo] | None
member_names
set[str]
passthrough
dict[str, Any]
per_image_passthrough
dict[str, list[Any]]
per_text_passthrough
dict[str, list[Any]]
sample
dict[str, Any]
sample_id
str
tar_path
str