nemo_rl.data_plane.adapters.noop#

In-memory DataPlaneClient test fixture.

Behaves like a real adapter end-to-end (put → get → clear, consumption counters, field-presence as the stage-done signal) but stores everything in process memory. The ABC contract tests run against this implementation so they don’t require TQ installed.

Production callers must NOT use this — :func:build_data_plane_client intentionally raises when enabled=False rather than returning a NoOp fallback (see factory.py).

Module Contents#

Classes#

_Partition

NoOpDataPlaneClient

Reference in-memory implementation.

Functions#

_reject_non_tensor_leaves

No pickle on the bus. Mirror of the TQ adapter check.

API#

nemo_rl.data_plane.adapters.noop._reject_non_tensor_leaves(td: tensordict.TensorDict) None#

No pickle on the bus. Mirror of the TQ adapter check.

Walk the leaves via keys() + indexed lookup rather than items(), because some tensordict versions skip NonTensorData entries from items(leaves_only=True) — they’re “leaves” by structure but not tensor-typed, so they’d silently slip past a naive items() iteration.

class nemo_rl.data_plane.adapters.noop._Partition#
fields: list[str]#

None

num_samples: int#

None

consumer_tasks: list[str]#

None

grpo_group_size: int | None#

None

enums: dict[str, list[str]]#

None

rows: dict[str, dict[str, torch.Tensor]]#

‘field(…)’

tags: dict[str, dict[str, Any]]#

‘field(…)’

consumed: dict[str, set[str]]#

‘field(…)’

class nemo_rl.data_plane.adapters.noop.NoOpDataPlaneClient#

Bases: nemo_rl.data_plane.interfaces.DataPlaneClient

Reference in-memory implementation.

Initialization

register_partition(
partition_id: str,
fields: list[str],
num_samples: int,
consumer_tasks: list[str],
grpo_group_size: int | None = None,
enums: dict[str, list[str]] | None = None,
) None#
claim_meta(
partition_id: str,
task_name: str,
required_fields: list[str],
batch_size: int,
dp_rank: int | None = None,
blocking: bool = True,
timeout_s: float = 60.0,
) nemo_rl.data_plane.interfaces.KVBatchMeta#
get_data(
meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
select_fields: list[str] | None = None,
) tensordict.TensorDict#
check_consumption_status(
partition_id: str,
task_names: list[str],
) bool#
put_samples(
sample_ids: list[str],
partition_id: str,
fields: tensordict.TensorDict | None = None,
tags: list[dict[str, Any]] | None = None,
) nemo_rl.data_plane.interfaces.KVBatchMeta#
get_samples(
sample_ids: list[str],
partition_id: str,
select_fields: list[str],
) tensordict.TensorDict#
clear_samples(sample_ids: list[str] | None, partition_id: str) None#
close() None#