nemo_retriever.graph package#

Submodules#

nemo_retriever.graph.abstract_operator module#

class nemo_retriever.graph.abstract_operator.AbstractOperator(**kwargs: Any)[source]#

Bases: ABC

Base class for all pipeline operators.

get_constructor_kwargs() dict[str, Any][source]#

Best-effort constructor kwargs for executor-side reconstruction.

abstractmethod postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
abstractmethod preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
abstractmethod process(
data: Any,
**kwargs: Any,
) Any[source]#
run(
data: Any,
**kwargs: Any,
) Any[source]#

nemo_retriever.graph.beir_eval module#

BEIR Evaluator — Designer component for running BEIR evaluation against LanceDB.

Reuses the existing evaluation logic from nemo_retriever.recall.beir and prints the standard run summary via nemo_retriever.utils.detection_summary.print_run_summary.

class nemo_retriever.graph.beir_eval.BEIREvaluatorActor(lancedb_uri: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=LanceDB URI, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=/path/to/lancedb)] = 'lancedb', lancedb_table: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Table Name, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'nv-ingest', embedding_model: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Embedding Model, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'nvidia/llama-nemotron-embed-1b-v2', beir_loader: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Loader, description=None, choices=['vidore_hf'], min_val=None, max_val=None, hidden=False, placeholder=None)] = 'vidore_hf', beir_dataset_name: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Dataset Name, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=e.g. vidore_v3_computer_science)] = '', beir_split: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Split, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'test', beir_query_language: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Query Language, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=Optional (e.g. en, fr))] = '', beir_doc_id_field: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Doc ID Field, description=None, choices=['pdf_basename', 'pdf_page', 'source_id', 'path'], min_val=None, max_val=None, hidden=False, placeholder=None)] = 'pdf_basename', beir_ks: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=K Values, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=1,3,5,10)] = '1,3,5,10', hybrid: ~typing.Annotated[bool, ~nemo_retriever.graph.designer.Param(label=Hybrid Search, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = False)[source]#

Bases: object

Designer BEIR evaluation node against an existing LanceDB table.

Assumes vectors were already written (for example via IngestVdbOperator or the retriever pipeline upload path). After evaluation, calls print_run_summary like the batch pipeline.

evaluate() dict[str, Any][source]#

Run the configured evaluation and print the standard run summary.

Returns the summary_dict produced by print_run_summary.

nemo_retriever.graph.content_operators module#

nemo_retriever.graph.content_transforms module#

nemo_retriever.graph.cpu_operator module#

Mixin flag for operators that run on CPU only.

class nemo_retriever.graph.cpu_operator.CPUOperator[source]#

Bases: object

Mixin flag indicating an operator runs on CPU only.

Operators that perform no GPU work (file I/O, text splitting, DataFrame transforms) should inherit from both AbstractOperator and this class:

class MyCPUActor(AbstractOperator, CPUOperator):
    ...

Executors can inspect isinstance(op, CPUOperator) to skip GPU resource allocation for these stages.

nemo_retriever.graph.custom_operator module#

class nemo_retriever.graph.custom_operator.UDFOperator(
fn: Callable[[Any], Any],
name: str | None = None,
**kwargs: Any,
)[source]#

Bases: AbstractOperator

A small operator wrapper for user-defined Python functions.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#

nemo_retriever.graph.designer module#

Decorator-driven registration of Designer components.

Usage:

from typing import Annotated
from nemo_retriever.graph.designer import designer_component, Param

@designer_component(
    name="PDF Splitter",
    category="Document Processing",
    compute="cpu",
    description="Splits multi-page PDFs into individual pages",
)
class PDFSplitActor(AbstractOperator):
    def __init__(
        self,
        mode: Annotated[str, Param(label="Split Mode", choices=["page", "chapter"])] = "page",
    ):
        ...
class nemo_retriever.graph.designer.Param(
label: str | None = None,
description: str | None = None,
choices: list[Any] | None = None,
min_val: float | None = None,
max_val: float | None = None,
hidden: bool = False,
placeholder: str | None = None,
)[source]#

Bases: object

Parameter metadata for Designer UI, used with typing.Annotated.

choices: list[Any] | None = None#
description: str | None = None#
hidden: bool = False#
label: str | None = None#
max_val: float | None = None#
min_val: float | None = None#
placeholder: str | None = None#
nemo_retriever.graph.designer.designer_component(
*,
name: str,
category: str = 'General',
compute: str = 'undefined',
description: str = '',
category_color: str | None = None,
component_type: str | None = None,
)[source]#

Register a class or function as a Designer-visible component.

Parameters:
  • name (str) – Friendly display name shown in the palette and on the canvas.

  • category (str) – Section heading in the component palette (e.g. “Document Processing”).

  • compute (str) – Resource hint: "gpu", "cpu", or "undefined".

  • description (str) – Short description shown as a tooltip.

  • category_color (str | None) – Optional hex colour for the category section (e.g. "#64b4ff").

  • component_type (str | None) – Special type tag for code-generation routing (e.g. "pipeline_sink", "pipeline_evaluator"). Leave None for standard map_batches operators.

nemo_retriever.graph.designer.get_registry() dict[str, dict[str, Any]][source]#

Return a snapshot of all registered designer components.

nemo_retriever.graph.executor module#

Pipeline executors that run a Graph against input data.

class nemo_retriever.graph.executor.AbstractExecutor(graph: Graph)[source]#

Bases: ABC

Base class for pipeline executors.

An executor takes a Graph at init time and provides an ingest() method that feeds data through the graph.

abstractmethod ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Execute the graph against data and return results.

class nemo_retriever.graph.executor.InprocessExecutor(
graph: Graph,
*,
show_progress: bool = True,
)[source]#

Bases: AbstractExecutor

Executor that runs a Graph in-process on pandas DataFrames.

No Ray dependency — each node’s operator is constructed once from operator_class(**operator_kwargs) and called sequentially on the accumulated DataFrame.

Only linear (single-root, no fan-out) graphs are currently supported.

ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Run the graph in-process on pandas DataFrames.

Parameters:

data – A pandas.DataFrame, a file path (str), or a list of file paths. When paths are provided, each file is read as raw bytes and combined into a single DataFrame with bytes and path columns before being passed through the graph.

Returns:

The result after all operators have been applied.

Return type:

pandas.DataFrame

class nemo_retriever.graph.executor.RayDataExecutor(
graph: Graph,
*,
ray_address: str | None = None,
batch_size: int = 1,
batch_format: str = 'pandas',
num_cpus: float = 1,
num_gpus: float = 0,
node_overrides: Dict[str, Dict[str, Any]] | None = None,
)[source]#

Bases: AbstractExecutor

Executor that builds a Ray Data pipeline from a Graph.

For each Node in the graph the executor appends a map_batches stage that uses the node’s operator_class with fn_constructor_kwargs for deferred construction on Ray workers. This ensures heavy GPU models are loaded on workers, not serialised from the driver.

The operator’s __call__ (defined on AbstractOperator) delegates to run(), so each map_batches stage executes the full preprocess → process → postprocess pipeline.

Only linear (single-root, no fan-out) graphs are currently supported.

build_dataset(
data: Any,
**kwargs: Any,
) Any[source]#

Build a lazy Ray Data pipeline from the graph.

Parameters:

data – Input to ray.data.read_binary_files (a path or list of glob patterns) or an already-constructed ray.data.Dataset.

Returns:

The lazy Ray dataset with all graph stages appended.

Return type:

ray.data.Dataset

ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Build, execute, and materialize a Ray Data pipeline from the graph.

nemo_retriever.graph.file_loader_operator module#

Operators for loading file paths into DataFrames.

class nemo_retriever.graph.file_loader_operator.FileListLoaderOperator(**kwargs: Any)[source]#

Bases: AbstractOperator, CPUOperator

Load a list of files into a DataFrame with path and bytes columns.

postprocess(
data: DataFrame,
**kwargs: Any,
) DataFrame[source]#
preprocess(
data: Any,
**kwargs: Any,
) list[str][source]#
process(
data: list[str],
**kwargs: Any,
) DataFrame[source]#

nemo_retriever.graph.gpu_operator module#

Mixin flag for operators that require GPU resources.

class nemo_retriever.graph.gpu_operator.GPUOperator[source]#

Bases: object

Mixin flag indicating an operator requires GPU resources.

Operators that load torch models or perform CUDA-based inference should inherit from both AbstractOperator and this class:

class MyGPUActor(AbstractOperator, GPUOperator):
    ...

Executors can inspect isinstance(op, GPUOperator) to allocate GPU resources or route work to GPU-capable workers.

nemo_retriever.graph.graph_pipeline_registry module#

Graph Pipeline Registry — manage, inspect, compare, and serialize golden pipeline graphs.

Provides a central GraphPipelineRegistry that stores named graph blueprints (factory functions + metadata). Graphs built from the registry can be inspected, diffed against each other, serialized to / loaded from JSON, and configured with kwarg overrides — all without touching the code that originally defined them.

A module-level default_registry is provided for convenience so that graph definitions scattered across the codebase can all register to a single shared instance.

Quick-start:

from nemo_retriever.graph.graph_pipeline_registry import default_registry

@default_registry.register("my-pipeline", description="Demo pipeline")
def _build():
    from nemo_retriever.graph import Graph
    return Graph() >> SomeOperator() >> AnotherOperator()

graph = default_registry.build("my-pipeline")
default_registry.print_graph("my-pipeline")
class nemo_retriever.graph.graph_pipeline_registry.GraphBlueprint(name: str, graph_factory: ~typing.Callable[[], ~nemo_retriever.graph.pipeline_graph.Graph], description: str = '', version: str = '1.0.0', tags: ~typing.List[str] = <factory>, created_at: str = <factory>, updated_at: str = <factory>)[source]#

Bases: object

A named, versioned graph definition held in the registry.

build() Graph[source]#

Construct a fresh Graph from the stored factory.

created_at: str#
description: str = ''#
graph_factory: Callable[[], Graph]#
info() str[source]#

Return a concise multi-line info string (builds the graph once to inspect it).

name: str#
tags: List[str]#
updated_at: str#
version: str = '1.0.0'#
class nemo_retriever.graph.graph_pipeline_registry.GraphDiff(identical: bool, structural_match: bool, node_count_a: int, node_count_b: int, roots_a: ~typing.List[str], roots_b: ~typing.List[str], node_diffs: ~typing.List[~nemo_retriever.graph.graph_pipeline_registry.NodeDiff] = <factory>, nodes_only_in_a: ~typing.List[str] = <factory>, nodes_only_in_b: ~typing.List[str] = <factory>)[source]#

Bases: object

Full diff result between two graphs.

format() str[source]#

Return a human-readable diff report.

identical: bool#
node_count_a: int#
node_count_b: int#
node_diffs: List[NodeDiff]#
nodes_only_in_a: List[str]#
nodes_only_in_b: List[str]#
roots_a: List[str]#
roots_b: List[str]#
structural_match: bool#
class nemo_retriever.graph.graph_pipeline_registry.GraphPipelineRegistry[source]#

Bases: object

Central registry for golden pipeline graph definitions.

Stores GraphBlueprint objects keyed by name. Supports decorator and imperative registration, building fresh graph instances, inspection / pretty-printing, diffing between graphs, kwarg overrides, and JSON serialization / deserialization of the entire registry.

Usage:

registry = GraphPipelineRegistry()

@registry.register("my-pipeline", description="Demo", version="1.0")
def _build():
    return Graph() >> SomeOperator() >> AnotherOperator()

graph = registry.build("my-pipeline")
registry.print_graph("my-pipeline")
build(
name: str,
) Graph[source]#

Build and return a fresh Graph from the named blueprint.

build_with_overrides(
name: str,
overrides: Dict[str, Dict[str, Any]],
) Graph[source]#

Build a graph and apply kwarg overrides to named nodes.

Parameters:
  • name – Registered graph name.

  • overrides{node_name: {kwarg_key: new_value, ...}} — each matching node’s operator_kwargs are updated with the given values.

diff(
name_a: str,
name_b: str,
) GraphDiff[source]#

Build both named graphs and return a GraphDiff.

get_blueprint(
name: str,
) GraphBlueprint[source]#

Return the GraphBlueprint for name.

Raises KeyError if not found.

get_graph_info(name: str) str[source]#

Return the full inspection report for a named graph as a string.

list_blueprints(
*,
tag: str | None = None,
) List[GraphBlueprint][source]#

Return all blueprints, optionally filtered by tag.

list_names() List[str][source]#

Return all registered graph names in insertion order.

load_all(
path: str | Path,
*,
overwrite: bool = False,
) List[str][source]#

Load graphs from a JSON file produced by save_all().

Each loaded graph is registered as a factory that deserializes the stored structure. Returns the list of graph names loaded.

load_graph(
path: str | Path,
*,
name: str | None = None,
overwrite: bool = False,
) str[source]#

Load a single graph from a JSON file and register it.

If name is not provided, the blueprint name stored in the file is used (falls back to the file stem). Returns the registered name.

print_diff(name_a: str, name_b: str) None[source]#

Print a human-readable diff between two registered graphs.

print_graph(
name: str,
*,
show_kwargs: bool = True,
) None[source]#

Build and pretty-print the named graph with full details.

print_summary() None[source]#

Print a compact table of every registered graph.

register(
name: str,
*,
description: str = '',
version: str = '1.0.0',
tags: List[str] | None = None,
overwrite: bool = False,
) Callable[[Callable[[], Graph]], Callable[[], Graph]][source]#

Decorator that registers a graph factory function.

Example:

@registry.register("pdf-extract", description="PDF extraction pipeline")
def _build():
    return Graph() >> PDFSplitActor() >> PDFExtractionActor()
register_graph(
name: str,
factory: Callable[[], Graph],
*,
description: str = '',
version: str = '1.0.0',
tags: List[str] | None = None,
overwrite: bool = False,
) None[source]#

Programmatically register a graph factory (non-decorator form).

save_all(
path: str | Path,
*,
indent: int = 2,
) Path[source]#

Serialize every registered graph to a single JSON file.

The file contains {name: {roots, metadata, blueprint}} for each registered graph. Returns the resolved path.

save_graph(
name: str,
path: str | Path,
*,
indent: int = 2,
) Path[source]#

Serialize a single named graph to a JSON file.

unregister(
name: str,
) GraphBlueprint[source]#

Remove and return the blueprint for name.

Raises KeyError if name is not registered.

class nemo_retriever.graph.graph_pipeline_registry.NodeDiff(
position: str,
node_a_name: str,
node_b_name: str,
name_changed: bool = False,
class_changed: bool = False,
class_a: str = '',
class_b: str = '',
kwargs_added: ~typing.Dict[str,
~typing.Any] = <factory>,
kwargs_removed: ~typing.Dict[str,
~typing.Any] = <factory>,
kwargs_changed: ~typing.Dict[str,
~typing.Tuple[~typing.Any,
~typing.Any]] = <factory>,
children_a_only: ~typing.List[str] = <factory>,
children_b_only: ~typing.List[str] = <factory>,
)[source]#

Bases: object

Differences between two nodes at corresponding positions.

children_a_only: List[str]#
children_b_only: List[str]#
class_a: str = ''#
class_b: str = ''#
class_changed: bool = False#
kwargs_added: Dict[str, Any]#
kwargs_changed: Dict[str, Tuple[Any, Any]]#
kwargs_removed: Dict[str, Any]#
name_changed: bool = False#
node_a_name: str#
node_b_name: str#
position: str#
nemo_retriever.graph.graph_pipeline_registry.clone_graph(
graph: Graph,
) Graph[source]#

Create a structural deep-copy of graph by round-tripping through serialization.

This produces new Node / operator instances so modifications to the clone do not affect the original.

nemo_retriever.graph.graph_pipeline_registry.collect_nodes(
graph: Graph,
) List[Node][source]#

Return an ordered list of all unique nodes in the graph.

nemo_retriever.graph.graph_pipeline_registry.deserialize_graph(
data: dict,
) Graph[source]#

Reconstruct a Graph from a dict produced by serialize_graph().

nemo_retriever.graph.graph_pipeline_registry.diff_graphs(
graph_a: Graph,
graph_b: Graph,
) GraphDiff[source]#

Compute a structural + configuration diff between two graphs.

Performs a parallel DFS walk and compares node names, operator classes, operator kwargs, and child topology at each corresponding position.

nemo_retriever.graph.graph_pipeline_registry.find_node(
graph: Graph,
name: str,
) Node | None[source]#

Return the first node whose name matches name, or None.

nemo_retriever.graph.graph_pipeline_registry.find_nodes(
graph: Graph,
name: str,
) List[Node][source]#

Return every node whose name matches name.

nemo_retriever.graph.graph_pipeline_registry.format_full_report(
graph: Graph,
*,
show_kwargs: bool = True,
) str[source]#

Return a complete inspection report: summary + tree + per-node details.

nemo_retriever.graph.graph_pipeline_registry.format_graph_summary(
graph: Graph,
) str[source]#

Return a concise summary: node count, depth, root/leaf names.

nemo_retriever.graph.graph_pipeline_registry.format_graph_tree(
graph: Graph,
*,
show_kwargs: bool = False,
show_class: bool = True,
max_value_width: int = 120,
) str[source]#

Return a human-readable tree representation of the graph.

Parameters:
  • graph – The graph to format.

  • show_kwargs – Display each node’s operator_kwargs beneath it.

  • show_class – Show the fully qualified operator class next to the node name.

  • max_value_width – Truncate kwarg value reprs longer than this.

nemo_retriever.graph.graph_pipeline_registry.format_node_details(
node: Node,
) str[source]#

Return a detailed multi-line description of a single node.

nemo_retriever.graph.graph_pipeline_registry.get_node_kwargs(
graph: Graph,
name: str,
) Dict[str, Any][source]#

Return the operator_kwargs for the first node named name.

Raises KeyError if no node matches.

nemo_retriever.graph.graph_pipeline_registry.leaf_nodes(
graph: Graph,
) List[Node][source]#

Return all leaf nodes (nodes with no children).

nemo_retriever.graph.graph_pipeline_registry.list_all_kwargs(
graph: Graph,
) Dict[str, Dict[str, Any]][source]#

Return {node_name: operator_kwargs} for every node in the graph.

nemo_retriever.graph.graph_pipeline_registry.load_graph(
path: str | Path,
) Graph[source]#

Load a graph from a JSON file produced by save_graph().

nemo_retriever.graph.graph_pipeline_registry.max_depth(graph: Graph) int[source]#

Return the maximum depth (longest root-to-leaf path) of the graph.

nemo_retriever.graph.graph_pipeline_registry.node_count(graph: Graph) int[source]#

Return the total number of unique nodes in the graph.

nemo_retriever.graph.graph_pipeline_registry.print_diff(
graph_a: Graph,
graph_b: Graph,
) None[source]#

Print a human-readable diff between two graphs to stdout.

nemo_retriever.graph.graph_pipeline_registry.print_graph(
graph: Graph,
*,
show_kwargs: bool = True,
) None[source]#

Print a full graph inspection to stdout.

nemo_retriever.graph.graph_pipeline_registry.remove_node_kwargs(
graph: Graph,
node_name: str,
keys: Sequence[str],
*,
all_matches: bool = False,
) int[source]#

Remove specific kwarg keys from node(s) matching node_name.

Returns the number of nodes modified. Missing keys are silently ignored.

nemo_retriever.graph.graph_pipeline_registry.replace_node_kwargs(
graph: Graph,
node_name: str,
new_kwargs: Dict[str, Any],
*,
all_matches: bool = False,
) int[source]#

Replace the entire operator_kwargs dict for matching node(s).

Returns the number of nodes modified.

nemo_retriever.graph.graph_pipeline_registry.save_graph(
graph: Graph,
path: str | Path,
*,
indent: int = 2,
) Path[source]#

Serialize graph and write it to a JSON file at path.

Returns the resolved Path that was written.

nemo_retriever.graph.graph_pipeline_registry.serialize_graph(
graph: Graph,
) dict[source]#

Serialize a graph to a JSON-compatible dictionary.

The result can be passed to json.dumps() (with the _RegistryJSONEncoder) and later restored via deserialize_graph().

nemo_retriever.graph.graph_pipeline_registry.update_node_kwargs(
graph: Graph,
node_name: str,
updates: Dict[str, Any],
*,
all_matches: bool = False,
) int[source]#

Update operator_kwargs for node(s) matching node_name in-place.

Parameters:
  • graph – The graph to modify.

  • node_name – Name of the target node(s).

  • updates{kwarg_key: new_value} pairs to merge in.

  • all_matches – If True, update every matching node. Otherwise update only the first match and raise KeyError if none is found.

Returns:

Number of nodes updated.

Return type:

int

nemo_retriever.graph.graph_pipeline_registry.walk_nodes(
graph: Graph,
) Iterator[Tuple[Node, int]][source]#

Yield (node, depth) for every unique node via depth-first traversal.

nemo_retriever.graph.ingestor_runtime module#

nemo_retriever.graph.lancedb_write_operator module#

Graph operator that appends embedding rows to a LanceDB table.

Designed for the service-mode ingest pipeline where pages arrive in incremental batches. Unlike LanceDBWriterActor (which overwrites on init), this operator lazily creates-or-opens the target table and appends rows so that concurrent worker processes can all write to the same LanceDB directory (LanceDB uses file-level locking internally).

class nemo_retriever.graph.lancedb_write_operator.LanceDBWriteOperator(
*,
uri: str = '/var/lib/nemo-retriever/lancedb',
table_name: str = 'nv-ingest',
hybrid: bool = False,
embedding_column: str = 'text_embeddings_1b_v2',
embedding_key: str = 'embedding',
text_column: str = 'text',
include_text: bool = True,
)[source]#

Bases: AbstractOperator, CPUOperator

Append embedding rows from a DataFrame batch to LanceDB.

The table is created on first write and appended to thereafter. The operator is side-effect-only: it returns the input DataFrame unmodified.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#

nemo_retriever.graph.multi_type_extract_operator module#

nemo_retriever.graph.operator_archetype module#

class nemo_retriever.graph.operator_archetype.ArchetypeOperator(**kwargs: Any)[source]#

Bases: AbstractOperator

Lightweight graph-facing operator that resolves to a hardware-specific variant.

classmethod cpu_variant_class() type[AbstractOperator] | None[source]#
classmethod gpu_variant_class() type[AbstractOperator] | None[source]#
postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod prefers_cpu_variant(
operator_kwargs: dict[str, Any] | None = None,
) bool[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod resolve_operator_class(
resources: ClusterResources | Resources | None = None,
operator_kwargs: dict[str, Any] | None = None,
) type[AbstractOperator][source]#
run(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod variant_operator_kwargs(
operator_class: type[AbstractOperator],
operator_kwargs: dict[str, Any] | None = None,
) dict[str, Any][source]#

Return constructor kwargs for the resolved concrete operator.

nemo_retriever.graph.operator_resolution module#

nemo_retriever.graph.operator_resolution.resolve_graph(
graph: Graph,
resources: ClusterResources | Resources,
) Graph[source]#
nemo_retriever.graph.operator_resolution.resolve_graph_for_local_execution(
graph: Graph,
) Graph[source]#
nemo_retriever.graph.operator_resolution.resolve_operator_class(
operator_class: type[AbstractOperator],
resources: ClusterResources | Resources,
operator_kwargs: dict | None = None,
) type[AbstractOperator][source]#
nemo_retriever.graph.operator_resolution.resolve_operator_kwargs(
operator_class: type[AbstractOperator],
resolved_class: type[AbstractOperator],
operator_kwargs: dict | None = None,
) dict[source]#

nemo_retriever.graph.pipeline_graph module#

Directed pipeline graph composed of Nodes that wrap AbstractOperators.

class nemo_retriever.graph.pipeline_graph.Graph[source]#

Bases: object

A directed acyclic pipeline graph.

A graph owns one or more root Node instances and can execute them in topological (depth-first) order.

The >> operator appends a node to the current leaf nodes and returns the graph itself, enabling fluent chaining:

graph = a >> b >> c >> d
# or
graph = Graph()
graph.add_root(a)
graph >> b >> c >> d

Bare AbstractOperator instances passed to add_root(), add_chain(), or >> are auto-wrapped in Node.

add_chain(
*nodes: Node | AbstractOperator,
) None[source]#

Chain nodes in order and register the first as a root.

Each element may be a Node or a bare AbstractOperator.

add_root(
node: Node | Graph | AbstractOperator,
) Node[source]#

Register node as a root (entry point) of the graph.

execute(
data: Any,
**kwargs: Any,
) List[Any][source]#

Execute every root and its descendants depth-first.

Each root receives data; children receive their parent’s output. Returns a list of leaf outputs (one per leaf node reached).

resolve(
resources: Any,
) Graph[source]#

Return a cloned graph with archetype operators mapped to concrete variants.

resolve_for_local_execution() Graph[source]#

Return a cloned graph resolved against resources detected on the current machine.

class nemo_retriever.graph.pipeline_graph.Node(
operator: AbstractOperator,
name: str | None = None,
*,
operator_class: type | None = None,
operator_kwargs: dict | None = None,
)[source]#

Bases: object

A single node in a pipeline graph.

Each node wraps an AbstractOperator and maintains an ordered list of child nodes that should execute after it.

The >> operator chains two nodes and returns a Graph:

graph = a >> b >> c   # Graph with root=a, a->b->c
graph.add_root(...)   # add more roots if needed
add_child(
child: Node | AbstractOperator,
) Node[source]#

Append child to this node’s children and return the child node.

nemo_retriever.graph.react_agent_operator module#

nemo_retriever.graph.recall_eval module#

nemo_retriever.graph.rrf_aggregator_operator module#

Operator that fuses per-step retrieval results using Reciprocal Rank Fusion.

class nemo_retriever.graph.rrf_aggregator_operator.RRFAggregatorOperator(*, k: int = 60)[source]#

Bases: AbstractOperator, CPUOperator

Fuse multiple per-step ranked lists into a single ranking per query using RRF.

Implements the Reciprocal Rank Fusion formula score(d) = sum(1 / (rank_i + k)) across all retrieval steps where document d appears. This is the same formula used in retrieval_bench/nemo_agentic/utils.py:rrf_from_subquery_results.

Designed to consume the output of ReActAgentOperator (or any operator that emits one row per (query_id, step_idx, doc_id) triple) and produce a single fused ranking per query_id suitable as input to SelectionAgentOperator.

Input DataFrame schema#

query_id : str — unique query identifier query_text : str — original query text (carried through) step_idx : int — which retrieval step produced this row (0, 1, 2 …) doc_id : str — retrieved document identifier text : str — document text content rank : int — 1-indexed rank within its step (1 = most relevant) (additional columns are ignored)

Output DataFrame schema#

query_id : str — same query_id as the input query_text: str — original query text (first occurrence per query) doc_id : str — document identifier rrf_score : float — fused RRF score (higher = more relevant) text : str — document text (first occurrence per doc_id) Rows are sorted by rrf_score descending within each query_id.

param k:

RRF damping factor. The standard value is 60 (default). Larger values reduce the influence of top-ranked documents.

type k:

int

Examples

import pandas as pd
from nemo_retriever.graph.rrf_aggregator_operator import RRFAggregatorOperator

op = RRFAggregatorOperator(k=60)
df = pd.DataFrame({
    "query_id":   ["q1", "q1", "q1", "q1"],
    "query_text": ["inflation causes"] * 4,
    "step_idx":   [0,    0,    1,    1   ],
    "doc_id":     ["d1", "d2", "d1", "d3"],
    "text":       ["t1", "t2", "t1", "t3"],
    "rank":       [1,    2,    1,    2   ],
})
result = op.run(df)
# d1 appears in both steps at rank 1 → highest RRF score
postprocess(
data: DataFrame,
**kwargs: Any,
) DataFrame[source]#
preprocess(
data: Any,
**kwargs: Any,
) DataFrame[source]#
process(
data: DataFrame,
**kwargs: Any,
) DataFrame[source]#

Compute RRF scores, group by query_id, sort by score descending.

nemo_retriever.graph.selection_agent_operator module#

nemo_retriever.graph.store_operator module#

Graph operator for persisting post-embedding row images to storage.

class nemo_retriever.graph.store_operator.StoreOperator(*, params: Any = None)[source]#

Bases: AbstractOperator, CPUOperator

Persist row-level image payloads to local or object storage.

The operator consumes _image_b64 produced by content transforms and writes _stored_image_uri for downstream vector DB upload. By default it clears inline base64 after successful writes to avoid carrying page-sized payloads into VDB upload.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#

nemo_retriever.graph.subquery_operator module#

nemo_retriever.graph.tabular_fetch_embeddings_operator module#

Graph operator: fetch tabular entity descriptions from Neo4j into an embedding-ready DataFrame.

class nemo_retriever.graph.tabular_fetch_embeddings_operator.TabularFetchEmbeddingsOp(*, database_name: str, **kwargs: Any)[source]#

Bases: AbstractOperator, CPUOperator

Fetch all tabular entity descriptions from Neo4j into an embedding-ready DataFrame.

This operator ignores its input — it always queries Neo4j directly and returns a fresh DataFrame with columns: text, _embed_modality, path, page_number, metadata.

The output schema matches the format produced by the unstructured pipeline, so the standard _BatchEmbedActor can be chained directly after this operator.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) DataFrame[source]#

nemo_retriever.graph.tabular_schema_extract_operator module#

nemo_retriever.graph.webhook_operator module#

Graph operator for posting processed results to a webhook endpoint.

class nemo_retriever.graph.webhook_operator.WebhookNotifyOperator(*, params: Any = None)[source]#

Bases: AbstractOperator, CPUOperator

Post batch results to an external HTTP endpoint.

This is a side-effect-only operator: it sends a JSON payload to a remote URL but passes the incoming data through unmodified. If endpoint_url is None (the default) the operator is a no-op.

Parameters:

params – A WebhookParams instance. If None or params.endpoint_url is falsy the stage does nothing.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#

Module contents#

Canonical graph-execution package for operators, graphs, and executors.

class nemo_retriever.graph.AbstractExecutor(graph: Graph)[source]#

Bases: ABC

Base class for pipeline executors.

An executor takes a Graph at init time and provides an ingest() method that feeds data through the graph.

abstractmethod ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Execute the graph against data and return results.

class nemo_retriever.graph.AbstractOperator(**kwargs: Any)[source]#

Bases: ABC

Base class for all pipeline operators.

get_constructor_kwargs() dict[str, Any][source]#

Best-effort constructor kwargs for executor-side reconstruction.

abstractmethod postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
abstractmethod preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
abstractmethod process(
data: Any,
**kwargs: Any,
) Any[source]#
run(
data: Any,
**kwargs: Any,
) Any[source]#
class nemo_retriever.graph.ArchetypeOperator(**kwargs: Any)[source]#

Bases: AbstractOperator

Lightweight graph-facing operator that resolves to a hardware-specific variant.

classmethod cpu_variant_class() type[AbstractOperator] | None[source]#
classmethod gpu_variant_class() type[AbstractOperator] | None[source]#
postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod prefers_cpu_variant(
operator_kwargs: dict[str, Any] | None = None,
) bool[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod resolve_operator_class(
resources: ClusterResources | Resources | None = None,
operator_kwargs: dict[str, Any] | None = None,
) type[AbstractOperator][source]#
run(
data: Any,
**kwargs: Any,
) Any[source]#
classmethod variant_operator_kwargs(
operator_class: type[AbstractOperator],
operator_kwargs: dict[str, Any] | None = None,
) dict[str, Any][source]#

Return constructor kwargs for the resolved concrete operator.

class nemo_retriever.graph.CPUOperator[source]#

Bases: object

Mixin flag indicating an operator runs on CPU only.

Operators that perform no GPU work (file I/O, text splitting, DataFrame transforms) should inherit from both AbstractOperator and this class:

class MyCPUActor(AbstractOperator, CPUOperator):
    ...

Executors can inspect isinstance(op, CPUOperator) to skip GPU resource allocation for these stages.

class nemo_retriever.graph.FileListLoaderOperator(**kwargs: Any)[source]#

Bases: AbstractOperator, CPUOperator

Load a list of files into a DataFrame with path and bytes columns.

postprocess(
data: DataFrame,
**kwargs: Any,
) DataFrame[source]#
preprocess(
data: Any,
**kwargs: Any,
) list[str][source]#
process(
data: list[str],
**kwargs: Any,
) DataFrame[source]#
class nemo_retriever.graph.GPUOperator[source]#

Bases: object

Mixin flag indicating an operator requires GPU resources.

Operators that load torch models or perform CUDA-based inference should inherit from both AbstractOperator and this class:

class MyGPUActor(AbstractOperator, GPUOperator):
    ...

Executors can inspect isinstance(op, GPUOperator) to allocate GPU resources or route work to GPU-capable workers.

class nemo_retriever.graph.Graph[source]#

Bases: object

A directed acyclic pipeline graph.

A graph owns one or more root Node instances and can execute them in topological (depth-first) order.

The >> operator appends a node to the current leaf nodes and returns the graph itself, enabling fluent chaining:

graph = a >> b >> c >> d
# or
graph = Graph()
graph.add_root(a)
graph >> b >> c >> d

Bare AbstractOperator instances passed to add_root(), add_chain(), or >> are auto-wrapped in Node.

add_chain(
*nodes: Node | AbstractOperator,
) None[source]#

Chain nodes in order and register the first as a root.

Each element may be a Node or a bare AbstractOperator.

add_root(
node: Node | Graph | AbstractOperator,
) Node[source]#

Register node as a root (entry point) of the graph.

execute(
data: Any,
**kwargs: Any,
) List[Any][source]#

Execute every root and its descendants depth-first.

Each root receives data; children receive their parent’s output. Returns a list of leaf outputs (one per leaf node reached).

resolve(
resources: Any,
) Graph[source]#

Return a cloned graph with archetype operators mapped to concrete variants.

resolve_for_local_execution() Graph[source]#

Return a cloned graph resolved against resources detected on the current machine.

class nemo_retriever.graph.GraphPipelineRegistry[source]#

Bases: object

Central registry for golden pipeline graph definitions.

Stores GraphBlueprint objects keyed by name. Supports decorator and imperative registration, building fresh graph instances, inspection / pretty-printing, diffing between graphs, kwarg overrides, and JSON serialization / deserialization of the entire registry.

Usage:

registry = GraphPipelineRegistry()

@registry.register("my-pipeline", description="Demo", version="1.0")
def _build():
    return Graph() >> SomeOperator() >> AnotherOperator()

graph = registry.build("my-pipeline")
registry.print_graph("my-pipeline")
build(
name: str,
) Graph[source]#

Build and return a fresh Graph from the named blueprint.

build_with_overrides(
name: str,
overrides: Dict[str, Dict[str, Any]],
) Graph[source]#

Build a graph and apply kwarg overrides to named nodes.

Parameters:
  • name – Registered graph name.

  • overrides{node_name: {kwarg_key: new_value, ...}} — each matching node’s operator_kwargs are updated with the given values.

diff(
name_a: str,
name_b: str,
) GraphDiff[source]#

Build both named graphs and return a GraphDiff.

get_blueprint(
name: str,
) GraphBlueprint[source]#

Return the GraphBlueprint for name.

Raises KeyError if not found.

get_graph_info(name: str) str[source]#

Return the full inspection report for a named graph as a string.

list_blueprints(
*,
tag: str | None = None,
) List[GraphBlueprint][source]#

Return all blueprints, optionally filtered by tag.

list_names() List[str][source]#

Return all registered graph names in insertion order.

load_all(
path: str | Path,
*,
overwrite: bool = False,
) List[str][source]#

Load graphs from a JSON file produced by save_all().

Each loaded graph is registered as a factory that deserializes the stored structure. Returns the list of graph names loaded.

load_graph(
path: str | Path,
*,
name: str | None = None,
overwrite: bool = False,
) str[source]#

Load a single graph from a JSON file and register it.

If name is not provided, the blueprint name stored in the file is used (falls back to the file stem). Returns the registered name.

print_diff(name_a: str, name_b: str) None[source]#

Print a human-readable diff between two registered graphs.

print_graph(
name: str,
*,
show_kwargs: bool = True,
) None[source]#

Build and pretty-print the named graph with full details.

print_summary() None[source]#

Print a compact table of every registered graph.

register(
name: str,
*,
description: str = '',
version: str = '1.0.0',
tags: List[str] | None = None,
overwrite: bool = False,
) Callable[[Callable[[], Graph]], Callable[[], Graph]][source]#

Decorator that registers a graph factory function.

Example:

@registry.register("pdf-extract", description="PDF extraction pipeline")
def _build():
    return Graph() >> PDFSplitActor() >> PDFExtractionActor()
register_graph(
name: str,
factory: Callable[[], Graph],
*,
description: str = '',
version: str = '1.0.0',
tags: List[str] | None = None,
overwrite: bool = False,
) None[source]#

Programmatically register a graph factory (non-decorator form).

save_all(
path: str | Path,
*,
indent: int = 2,
) Path[source]#

Serialize every registered graph to a single JSON file.

The file contains {name: {roots, metadata, blueprint}} for each registered graph. Returns the resolved path.

save_graph(
name: str,
path: str | Path,
*,
indent: int = 2,
) Path[source]#

Serialize a single named graph to a JSON file.

unregister(
name: str,
) GraphBlueprint[source]#

Remove and return the blueprint for name.

Raises KeyError if name is not registered.

class nemo_retriever.graph.InprocessExecutor(
graph: Graph,
*,
show_progress: bool = True,
)[source]#

Bases: AbstractExecutor

Executor that runs a Graph in-process on pandas DataFrames.

No Ray dependency — each node’s operator is constructed once from operator_class(**operator_kwargs) and called sequentially on the accumulated DataFrame.

Only linear (single-root, no fan-out) graphs are currently supported.

ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Run the graph in-process on pandas DataFrames.

Parameters:

data – A pandas.DataFrame, a file path (str), or a list of file paths. When paths are provided, each file is read as raw bytes and combined into a single DataFrame with bytes and path columns before being passed through the graph.

Returns:

The result after all operators have been applied.

Return type:

pandas.DataFrame

class nemo_retriever.graph.Node(
operator: AbstractOperator,
name: str | None = None,
*,
operator_class: type | None = None,
operator_kwargs: dict | None = None,
)[source]#

Bases: object

A single node in a pipeline graph.

Each node wraps an AbstractOperator and maintains an ordered list of child nodes that should execute after it.

The >> operator chains two nodes and returns a Graph:

graph = a >> b >> c   # Graph with root=a, a->b->c
graph.add_root(...)   # add more roots if needed
add_child(
child: Node | AbstractOperator,
) Node[source]#

Append child to this node’s children and return the child node.

class nemo_retriever.graph.RayDataExecutor(
graph: Graph,
*,
ray_address: str | None = None,
batch_size: int = 1,
batch_format: str = 'pandas',
num_cpus: float = 1,
num_gpus: float = 0,
node_overrides: Dict[str, Dict[str, Any]] | None = None,
)[source]#

Bases: AbstractExecutor

Executor that builds a Ray Data pipeline from a Graph.

For each Node in the graph the executor appends a map_batches stage that uses the node’s operator_class with fn_constructor_kwargs for deferred construction on Ray workers. This ensures heavy GPU models are loaded on workers, not serialised from the driver.

The operator’s __call__ (defined on AbstractOperator) delegates to run(), so each map_batches stage executes the full preprocess → process → postprocess pipeline.

Only linear (single-root, no fan-out) graphs are currently supported.

build_dataset(
data: Any,
**kwargs: Any,
) Any[source]#

Build a lazy Ray Data pipeline from the graph.

Parameters:

data – Input to ray.data.read_binary_files (a path or list of glob patterns) or an already-constructed ray.data.Dataset.

Returns:

The lazy Ray dataset with all graph stages appended.

Return type:

ray.data.Dataset

ingest(
data: Any,
**kwargs: Any,
) Any[source]#

Build, execute, and materialize a Ray Data pipeline from the graph.

class nemo_retriever.graph.StoreOperator(*, params: Any = None)[source]#

Bases: AbstractOperator, CPUOperator

Persist row-level image payloads to local or object storage.

The operator consumes _image_b64 produced by content transforms and writes _stored_image_uri for downstream vector DB upload. By default it clears inline base64 after successful writes to avoid carrying page-sized payloads into VDB upload.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#
class nemo_retriever.graph.UDFOperator(
fn: Callable[[Any], Any],
name: str | None = None,
**kwargs: Any,
)[source]#

Bases: AbstractOperator

A small operator wrapper for user-defined Python functions.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#
class nemo_retriever.graph.WebhookNotifyOperator(*, params: Any = None)[source]#

Bases: AbstractOperator, CPUOperator

Post batch results to an external HTTP endpoint.

This is a side-effect-only operator: it sends a JSON payload to a remote URL but passes the incoming data through unmodified. If endpoint_url is None (the default) the operator is a no-op.

Parameters:

params – A WebhookParams instance. If None or params.endpoint_url is falsy the stage does nothing.

postprocess(
data: Any,
**kwargs: Any,
) Any[source]#
preprocess(
data: Any,
**kwargs: Any,
) Any[source]#
process(
data: Any,
**kwargs: Any,
) Any[source]#