nemo_rl.data_plane.column_io#
Column-level helpers above :class:DataPlaneClient.
These are thin wrappers around :meth:get_samples / :meth:put_samples
that operate on columns (named fields) of a partition — not on the
driver process specifically. The driver uses them to fetch a slice and
materialize / write deltas back; worker-side dispatches use the
equivalents on AbstractPolicyWorker (self._fetch(meta) /
self._write_back).
- func:
read_columns—get_samples + materialize(decode jagged + object-array fields into a :class:BatchedDataDict).
- func:
write_columns— pack-to-wire +put_samplesfor deltas against an existing :class:KVBatchMeta.
- func:
kv_first_write— pack-to-wire +put_samplesfor the rollout-actor’s first put of a partition. Returns a new- class:
KVBatchMeta.
Module Contents#
Functions#
Smallest |
|
|
|
|
|
Single flat |
API#
- nemo_rl.data_plane.column_io.round_up(value: int, multiple: int) int#
Smallest
multiple-aligned int ≥value(no-op whenmultiple <= 1).
- nemo_rl.data_plane.column_io.read_columns(
- dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
- meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
- select_fields: Sequence[str],
- *,
- layout: nemo_rl.data_plane.schema.Layout = 'padded',
- pad_value_dict: dict[str, Any] | None = None,
get_samples(meta.sample_ids, select_fields=...) → materialize.Pads to
meta.extra_info[GLOBAL_FORWARD_PAD_SEQLEN](minted on the driver byTQPolicy._stamp_pad_seqlenand inherited by every per-rank shard via :func:shard_meta_for_dp) — so driver-fetched and worker-returned columns land at one identical seq dim.- Parameters:
dp_client – Data-plane client used for the underlying fetch.
meta –
KVBatchMetadescribing the keys to fetch.select_fields – Fields to fetch.
layout – Materialization layout (
"padded"or"jagged").pad_value_dict – Per-field pad value for jagged tensors (e.g.
input_ids → pad_token_id); defaults to 0.
- Returns:
BatchedDataDictwith the requested fields, materialized.
- nemo_rl.data_plane.column_io.write_columns(
- dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
- meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
- fields: dict[str, torch.Tensor | np.ndarray],
put_samples(meta.sample_ids, fields=...).Per-token tensor fields are converted to jagged via
- Func:
pack_jagged_fieldsso they land in TQ with the same row lengths as the initial put.np.ndarray(dtype=object)leaves pass through as-is.- Parameters:
dp_client – Data-plane client used for the underlying put.
meta –
KVBatchMetadescribing the keys being written.fields – Map of field name to tensor or object array.
- nemo_rl.data_plane.column_io.kv_first_write(
- final_batch_cpu: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
- *,
- sample_ids: Sequence[str],
- dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
- partition_id: str,
- extra_info: dict[str, Any] | None = None,
- task_name: str = 'train',
- pad_to_multiple: int = 1,
- tags: list[dict[str, Any]] | None = None,
Single flat
put_samplesof every tensor field infinal_batch_cpu.The rollout actor’s first put of a partition. Caller mints
sample_ids(verl-style) — the helper is rollout-shape-agnostic.- Parameters:
final_batch_cpu –
Rollout output already on CPU. Must contain
"sample_mask"(used as batch-size oracle:shape[0] == N) and"input_lengths"(per-row valid lengths for the jagged pack). Tensor fields are packed jagged via- func:
pack_jagged_fields;np.ndarray(dtype=object)leaves pass through.
sample_ids – Pre-minted per-sample ids, one per row of
final_batch_cpu.dp_client – Data-plane client used for the put.
partition_id – TQ partition to write into.
extra_info – Optional extra fields to attach to the returned meta.
task_name – Consumer task tag stamped on the returned meta.
pad_to_multiple – Seq-dim alignment recorded in
extra_infoso readers pad to a multiple compatible with downstream backends (mcore SP, PyTorch CP).tags – Optional per-sample primitive metadata (one dict per row). Stored on the TQ controller alongside the samples; travels with
KVBatchMetathroughsubset/concat/sliceso consumers can filter on it without fetching tensor data.
- Returns:
KVBatchMetacovering the written samples.