nemo_rl.data_plane.adapters.transfer_queue#
Adapter wiring :class:DataPlaneClient onto the transfer_queue package.
Pure plumbing — it owns the TQ controller / client handle and translates
- class:
KVBatchMeta↔ TQ’s ownBatchMeta/KVBatchMeta. No business logic. Backend init is lifted fromrl-arena/arena/backends.py; the call shapes are lifted fromrl-arena/arena/dataplane_client.py.
Module Contents#
Classes#
Adapter façade — maps NeMo-RL calls onto TransferQueue’s public API. |
Functions#
Return THIS process’s host IP, not the cluster head’s. |
|
Worker-process path: connect this process’s client to the Ray cluster. |
|
Return the |
|
Inject a per-actor |
|
Driver-process path: bootstrap the TQ controller for the chosen backend. |
|
Guard against silent leaf drops through TensorDict constructor rebuild. |
|
Unsqueeze 1D tensor leaves to |
|
Inverse of |
Data#
API#
- nemo_rl.data_plane.adapters.transfer_queue._get_local_node_ip() str#
Return THIS process’s host IP, not the cluster head’s.
Each Ray actor process must use its own node’s IP so Mooncake’s announce address (
MC_TCP_BIND_ADDRESS→desc.ip_or_host_nameintransfer_engine_impl.cpp) is routable cross-node. Non-routable addresses are rejected:Link-local (169.254/16, fe80::/10) —
gethostbynamecan resolve to APIPA on hosts whereavahi-autoipdis active.Loopback (127.0.0.0/8, ::1) — hosts whose
/etc/hostsmaps the hostname to 127.0.0.1 would otherwise announce an unroutable address to Mooncake peers, causing cross-nodeconnection refused.
- nemo_rl.data_plane.adapters.transfer_queue._mooncake_transport_config() dict#
- nemo_rl.data_plane.adapters.transfer_queue._connect_existing() None#
Worker-process path: connect this process’s client to the Ray cluster.
Connects to the already-running named controller actor. Mirrors rl-arena/arena/dataplane_client.py’s
tq.init()(no args) call.
- nemo_rl.data_plane.adapters.transfer_queue._TQ_RUNTIME_ENV_PATCHED#
False
- nemo_rl.data_plane.adapters.transfer_queue._resolve_tq_pin() str#
Return the
TransferQueuerequirement string from nemo-rl metadata.Single source of truth is
pyproject.toml— we read it back viaimportlib.metadata.requiresso the runtime_env injection cannot drift from the dependency declaration.
- nemo_rl.data_plane.adapters.transfer_queue._patch_tq_actor_runtime_env() None#
Inject a per-actor
runtime_envpin into TQ’s actor.options().TQ spawns
SimpleStorageUnitandTransferQueueControllerviaCls.options(...).remote(...)without a runtime_env, so they inherit the job-level env. In a multi-node container deployment where each node has its own/opt/nemo_rl_venv, the driver’suv synconly updates ray-head’s venv and a worker-node actor fails withModuleNotFoundError. This monkey-patch makes Ray pip-install TQ into a per-actor runtime_env on first spawn (cached per-node by Ray afterwards). Idempotent. Couples us to TQ’s internal class layout — if TQ restructures, this becomes a no-op with a logged warning and we fall back to per-nodeuv sync.The pin is sourced from nemo-rl’s installed metadata via
- Func:
_resolve_tq_pinso it cannot drift frompyproject.toml.
TODO(zhiyul): remove this patch once the nightly container image is published with
TransferQueuebaked in viapyproject.toml. When every node starts from that image, the base env already has TQ and Ray actors inherit it — this injection then becomes pure overhead (Ray builds a redundant per-actor pip env on top of the container’s existing TQ install). Drop the call fromTQDataPlaneClient.__init__and delete this function.
- nemo_rl.data_plane.adapters.transfer_queue._init_tq(cfg: nemo_rl.data_plane.interfaces.DataPlaneConfig) None#
Driver-process path: bootstrap the TQ controller for the chosen backend.
- nemo_rl.data_plane.adapters.transfer_queue._assert_no_key_loss(
- src_dict: dict,
- new_td: tensordict.TensorDict,
- fn: str,
Guard against silent leaf drops through TensorDict constructor rebuild.
tensordict’s constructor has historically dropped NonTensorStack / NonTensorData leaves when built from a plain dict. Compare the source dict’s keys against the rebuilt TD’s top-level keys.
- nemo_rl.data_plane.adapters.transfer_queue._promote_1d_leaves(td: tensordict.TensorDict) tensordict.TensorDict#
Unsqueeze 1D tensor leaves to
(N, 1)— mooncake_cpu KV-path workaround.Works around TQ’s
KVStorageManager1D schema/data mismatch;- Func:
_from_wiresqueezes the trailing 1 back on read. Symmetric with_from_wire— callers gate onself._promote_1d.NonTensorStack/NonTensorDataleaves pass through.- Parameters:
td –
TensorDictwhose 1D tensor leaves should be promoted.- Returns:
TensorDictwith 1D tensor leaves unsqueezed to(N, 1); all other leaves pass through unchanged.
- nemo_rl.data_plane.adapters.transfer_queue._from_wire(td: tensordict.TensorDict) tensordict.TensorDict#
Inverse of
_promote_1d_leaves: squeeze trailing 1 back to (N,).
- class nemo_rl.data_plane.adapters.transfer_queue.TQDataPlaneClient(
- cfg: nemo_rl.data_plane.interfaces.DataPlaneConfig,
- *,
- bootstrap: bool = True,
Bases:
nemo_rl.data_plane.interfaces.DataPlaneClientAdapter façade — maps NeMo-RL calls onto TransferQueue’s public API.
Initialization
Construct a TQ-backed client.
- Parameters:
cfg – data-plane config (backend selection, poll cadence, …).
bootstrap – True (driver) bootstraps the TQ controller using
cfg. False (worker) connects this process to an already-running named controller actor in the Ray cluster —cfgis then only consulted for client-side knobs (poll interval).
- register_partition(
- partition_id: str,
- fields: list[str],
- num_samples: int,
- consumer_tasks: list[str],
- grpo_group_size: int | None = None,
- enums: dict[str, list[str]] | None = None,
- claim_meta(
- partition_id: str,
- task_name: str,
- required_fields: list[str],
- batch_size: int,
- dp_rank: int | None = None,
- blocking: bool = True,
- timeout_s: float = 60.0,
- get_data(
- meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
- select_fields: list[str] | None = None,
- check_consumption_status(
- partition_id: str,
- task_names: list[str],
- put_samples(
- sample_ids: list[str],
- partition_id: str,
- fields: tensordict.TensorDict | None = None,
- tags: list[dict[str, Any]] | None = None,
- get_samples(
- sample_ids: list[str],
- partition_id: str,
- select_fields: list[str],
- clear_samples(sample_ids: list[str] | None, partition_id: str) None#
- close() None#