nemo_rl.data_plane.column_io#

Column-level helpers above :class:DataPlaneClient.

These are thin wrappers around :meth:get_samples / :meth:put_samples that operate on columns (named fields) of a partition — not on the driver process specifically. The driver uses them to fetch a slice and materialize / write deltas back; worker-side dispatches use the equivalents on AbstractPolicyWorker (self._fetch(meta) / self._write_back).

  • func:

    read_columnsget_samples + materialize (decode jagged + object-array fields into a :class:BatchedDataDict).

  • func:

    write_columns — pack-to-wire + put_samples for deltas against an existing :class:KVBatchMeta.

  • func:

    kv_first_write — pack-to-wire + put_samples for the rollout-actor’s first put of a partition. Returns a new

    class:

    KVBatchMeta.

Module Contents#

Functions#

round_up

Smallest multiple-aligned int ≥ value (no-op when multiple <= 1).

read_columns

get_samples(meta.sample_ids, select_fields=...) materialize.

write_columns

put_samples(meta.sample_ids, fields=...).

kv_first_write

Single flat put_samples of every tensor field in final_batch_cpu.

API#

nemo_rl.data_plane.column_io.round_up(value: int, multiple: int) int#

Smallest multiple-aligned int ≥ value (no-op when multiple <= 1).

nemo_rl.data_plane.column_io.read_columns(
dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
select_fields: Sequence[str],
*,
layout: nemo_rl.data_plane.schema.Layout = 'padded',
pad_value_dict: dict[str, Any] | None = None,
) nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any]#

get_samples(meta.sample_ids, select_fields=...) materialize.

Pads to meta.extra_info[GLOBAL_FORWARD_PAD_SEQLEN] (minted on the driver by TQPolicy._stamp_pad_seqlen and inherited by every per-rank shard via :func:shard_meta_for_dp) — so driver-fetched and worker-returned columns land at one identical seq dim.

Parameters:
  • dp_client – Data-plane client used for the underlying fetch.

  • metaKVBatchMeta describing the keys to fetch.

  • select_fields – Fields to fetch.

  • layout – Materialization layout ("padded" or "jagged").

  • pad_value_dict – Per-field pad value for jagged tensors (e.g. input_ids pad_token_id); defaults to 0.

Returns:

BatchedDataDict with the requested fields, materialized.

nemo_rl.data_plane.column_io.write_columns(
dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
fields: dict[str, torch.Tensor | np.ndarray],
) None#

put_samples(meta.sample_ids, fields=...).

Per-token tensor fields are converted to jagged via

Func:

pack_jagged_fields so they land in TQ with the same row lengths as the initial put. np.ndarray(dtype=object) leaves pass through as-is.

Parameters:
  • dp_client – Data-plane client used for the underlying put.

  • metaKVBatchMeta describing the keys being written.

  • fields – Map of field name to tensor or object array.

nemo_rl.data_plane.column_io.kv_first_write(
final_batch_cpu: nemo_rl.distributed.batched_data_dict.BatchedDataDict[Any],
*,
sample_ids: Sequence[str],
dp_client: nemo_rl.data_plane.interfaces.DataPlaneClient,
partition_id: str,
extra_info: dict[str, Any] | None = None,
task_name: str = 'train',
pad_to_multiple: int = 1,
tags: list[dict[str, Any]] | None = None,
) nemo_rl.data_plane.interfaces.KVBatchMeta#

Single flat put_samples of every tensor field in final_batch_cpu.

The rollout actor’s first put of a partition. Caller mints sample_ids (verl-style) — the helper is rollout-shape-agnostic.

Parameters:
  • final_batch_cpu

    Rollout output already on CPU. Must contain "sample_mask" (used as batch-size oracle: shape[0] == N) and "input_lengths" (per-row valid lengths for the jagged pack). Tensor fields are packed jagged via

    func:

    pack_jagged_fields; np.ndarray(dtype=object) leaves pass through.

  • sample_ids – Pre-minted per-sample ids, one per row of final_batch_cpu.

  • dp_client – Data-plane client used for the put.

  • partition_id – TQ partition to write into.

  • extra_info – Optional extra fields to attach to the returned meta.

  • task_name – Consumer task tag stamped on the returned meta.

  • pad_to_multiple – Seq-dim alignment recorded in extra_info so readers pad to a multiple compatible with downstream backends (mcore SP, PyTorch CP).

  • tags – Optional per-sample primitive metadata (one dict per row). Stored on the TQ controller alongside the samples; travels with KVBatchMeta through subset / concat / slice so consumers can filter on it without fetching tensor data.

Returns:

KVBatchMeta covering the written samples.