nemo_rl.data_plane.adapters.transfer_queue#

Adapter wiring :class:DataPlaneClient onto the transfer_queue package.

Pure plumbing — it owns the TQ controller / client handle and translates

class:

KVBatchMeta ↔ TQ’s own BatchMeta / KVBatchMeta. No business logic. Backend init is lifted from rl-arena/arena/backends.py; the call shapes are lifted from rl-arena/arena/dataplane_client.py.

Module Contents#

Classes#

TQDataPlaneClient

Adapter façade — maps NeMo-RL calls onto TransferQueue’s public API.

Functions#

_get_local_node_ip

Return THIS process’s host IP, not the cluster head’s.

_mooncake_transport_config

_connect_existing

Worker-process path: connect this process’s client to the Ray cluster.

_resolve_tq_pin

Return the TransferQueue requirement string from nemo-rl metadata.

_patch_tq_actor_runtime_env

Inject a per-actor runtime_env pin into TQ’s actor .options().

_init_tq

Driver-process path: bootstrap the TQ controller for the chosen backend.

_assert_no_key_loss

Guard against silent leaf drops through TensorDict constructor rebuild.

_promote_1d_leaves

Unsqueeze 1D tensor leaves to (N, 1) — mooncake_cpu KV-path workaround.

_from_wire

Inverse of _promote_1d_leaves: squeeze trailing 1 back to (N,).

Data#

API#

nemo_rl.data_plane.adapters.transfer_queue._get_local_node_ip() str#

Return THIS process’s host IP, not the cluster head’s.

Each Ray actor process must use its own node’s IP so Mooncake’s announce address (MC_TCP_BIND_ADDRESSdesc.ip_or_host_name in transfer_engine_impl.cpp) is routable cross-node. Non-routable addresses are rejected:

  • Link-local (169.254/16, fe80::/10) — gethostbyname can resolve to APIPA on hosts where avahi-autoipd is active.

  • Loopback (127.0.0.0/8, ::1) — hosts whose /etc/hosts maps the hostname to 127.0.0.1 would otherwise announce an unroutable address to Mooncake peers, causing cross-node connection refused.

nemo_rl.data_plane.adapters.transfer_queue._mooncake_transport_config() dict#
nemo_rl.data_plane.adapters.transfer_queue._connect_existing() None#

Worker-process path: connect this process’s client to the Ray cluster.

Connects to the already-running named controller actor. Mirrors rl-arena/arena/dataplane_client.py’s tq.init() (no args) call.

nemo_rl.data_plane.adapters.transfer_queue._TQ_RUNTIME_ENV_PATCHED#

False

nemo_rl.data_plane.adapters.transfer_queue._resolve_tq_pin() str#

Return the TransferQueue requirement string from nemo-rl metadata.

Single source of truth is pyproject.toml — we read it back via importlib.metadata.requires so the runtime_env injection cannot drift from the dependency declaration.

nemo_rl.data_plane.adapters.transfer_queue._patch_tq_actor_runtime_env() None#

Inject a per-actor runtime_env pin into TQ’s actor .options().

TQ spawns SimpleStorageUnit and TransferQueueController via Cls.options(...).remote(...) without a runtime_env, so they inherit the job-level env. In a multi-node container deployment where each node has its own /opt/nemo_rl_venv, the driver’s uv sync only updates ray-head’s venv and a worker-node actor fails with ModuleNotFoundError. This monkey-patch makes Ray pip-install TQ into a per-actor runtime_env on first spawn (cached per-node by Ray afterwards). Idempotent. Couples us to TQ’s internal class layout — if TQ restructures, this becomes a no-op with a logged warning and we fall back to per-node uv sync.

The pin is sourced from nemo-rl’s installed metadata via

Func:

_resolve_tq_pin so it cannot drift from pyproject.toml.

TODO(zhiyul): remove this patch once the nightly container image is published with TransferQueue baked in via pyproject.toml. When every node starts from that image, the base env already has TQ and Ray actors inherit it — this injection then becomes pure overhead (Ray builds a redundant per-actor pip env on top of the container’s existing TQ install). Drop the call from TQDataPlaneClient.__init__ and delete this function.

nemo_rl.data_plane.adapters.transfer_queue._init_tq(cfg: nemo_rl.data_plane.interfaces.DataPlaneConfig) None#

Driver-process path: bootstrap the TQ controller for the chosen backend.

nemo_rl.data_plane.adapters.transfer_queue._assert_no_key_loss(
src_dict: dict,
new_td: tensordict.TensorDict,
fn: str,
) None#

Guard against silent leaf drops through TensorDict constructor rebuild.

tensordict’s constructor has historically dropped NonTensorStack / NonTensorData leaves when built from a plain dict. Compare the source dict’s keys against the rebuilt TD’s top-level keys.

nemo_rl.data_plane.adapters.transfer_queue._promote_1d_leaves(td: tensordict.TensorDict) tensordict.TensorDict#

Unsqueeze 1D tensor leaves to (N, 1) — mooncake_cpu KV-path workaround.

Works around TQ’s KVStorageManager 1D schema/data mismatch;

Func:

_from_wire squeezes the trailing 1 back on read. Symmetric with _from_wire — callers gate on self._promote_1d. NonTensorStack / NonTensorData leaves pass through.

Parameters:

tdTensorDict whose 1D tensor leaves should be promoted.

Returns:

TensorDict with 1D tensor leaves unsqueezed to (N, 1); all other leaves pass through unchanged.

nemo_rl.data_plane.adapters.transfer_queue._from_wire(td: tensordict.TensorDict) tensordict.TensorDict#

Inverse of _promote_1d_leaves: squeeze trailing 1 back to (N,).

class nemo_rl.data_plane.adapters.transfer_queue.TQDataPlaneClient(
cfg: nemo_rl.data_plane.interfaces.DataPlaneConfig,
*,
bootstrap: bool = True,
)#

Bases: nemo_rl.data_plane.interfaces.DataPlaneClient

Adapter façade — maps NeMo-RL calls onto TransferQueue’s public API.

Initialization

Construct a TQ-backed client.

Parameters:
  • cfg – data-plane config (backend selection, poll cadence, …).

  • bootstrap – True (driver) bootstraps the TQ controller using cfg. False (worker) connects this process to an already-running named controller actor in the Ray cluster — cfg is then only consulted for client-side knobs (poll interval).

register_partition(
partition_id: str,
fields: list[str],
num_samples: int,
consumer_tasks: list[str],
grpo_group_size: int | None = None,
enums: dict[str, list[str]] | None = None,
) None#
claim_meta(
partition_id: str,
task_name: str,
required_fields: list[str],
batch_size: int,
dp_rank: int | None = None,
blocking: bool = True,
timeout_s: float = 60.0,
) nemo_rl.data_plane.interfaces.KVBatchMeta#
get_data(
meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
select_fields: list[str] | None = None,
) tensordict.TensorDict#
check_consumption_status(
partition_id: str,
task_names: list[str],
) bool#
put_samples(
sample_ids: list[str],
partition_id: str,
fields: tensordict.TensorDict | None = None,
tags: list[dict[str, Any]] | None = None,
) nemo_rl.data_plane.interfaces.KVBatchMeta#
get_samples(
sample_ids: list[str],
partition_id: str,
select_fields: list[str],
) tensordict.TensorDict#
clear_samples(sample_ids: list[str] | None, partition_id: str) None#
close() None#