nemo_rl.data_plane.adapters.noop#
In-memory DataPlaneClient test fixture.
Behaves like a real adapter end-to-end (put → get → clear, consumption counters, field-presence as the stage-done signal) but stores everything in process memory. The ABC contract tests run against this implementation so they don’t require TQ installed.
Production callers must NOT use this — :func:build_data_plane_client
intentionally raises when enabled=False rather than returning a NoOp
fallback (see factory.py).
Module Contents#
Classes#
Reference in-memory implementation. |
Functions#
No pickle on the bus. Mirror of the TQ adapter check. |
API#
- nemo_rl.data_plane.adapters.noop._reject_non_tensor_leaves(td: tensordict.TensorDict) None#
No pickle on the bus. Mirror of the TQ adapter check.
Walk the leaves via
keys()+ indexed lookup rather thanitems(), because some tensordict versions skipNonTensorDataentries fromitems(leaves_only=True)— they’re “leaves” by structure but not tensor-typed, so they’d silently slip past a naive items() iteration.
- class nemo_rl.data_plane.adapters.noop._Partition#
- fields: list[str]#
None
- num_samples: int#
None
- consumer_tasks: list[str]#
None
- grpo_group_size: int | None#
None
- enums: dict[str, list[str]]#
None
- rows: dict[str, dict[str, torch.Tensor]]#
‘field(…)’
- tags: dict[str, dict[str, Any]]#
‘field(…)’
- consumed: dict[str, set[str]]#
‘field(…)’
- class nemo_rl.data_plane.adapters.noop.NoOpDataPlaneClient#
Bases:
nemo_rl.data_plane.interfaces.DataPlaneClientReference in-memory implementation.
Initialization
- register_partition(
- partition_id: str,
- fields: list[str],
- num_samples: int,
- consumer_tasks: list[str],
- grpo_group_size: int | None = None,
- enums: dict[str, list[str]] | None = None,
- claim_meta(
- partition_id: str,
- task_name: str,
- required_fields: list[str],
- batch_size: int,
- dp_rank: int | None = None,
- blocking: bool = True,
- timeout_s: float = 60.0,
- get_data(
- meta: nemo_rl.data_plane.interfaces.KVBatchMeta,
- select_fields: list[str] | None = None,
- check_consumption_status(
- partition_id: str,
- task_names: list[str],
- put_samples(
- sample_ids: list[str],
- partition_id: str,
- fields: tensordict.TensorDict | None = None,
- tags: list[dict[str, Any]] | None = None,
- get_samples(
- sample_ids: list[str],
- partition_id: str,
- select_fields: list[str],
- clear_samples(sample_ids: list[str] | None, partition_id: str) None#
- close() None#