nemo_retriever.graph package#
Submodules#
nemo_retriever.graph.abstract_operator module#
nemo_retriever.graph.beir_eval module#
BEIR Evaluator — Designer component for running BEIR evaluation against LanceDB.
Reuses the existing evaluation logic from nemo_retriever.recall.beir and
prints the standard run summary via
nemo_retriever.utils.detection_summary.print_run_summary.
- class nemo_retriever.graph.beir_eval.BEIREvaluatorActor(lancedb_uri: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=LanceDB URI, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=/path/to/lancedb)] = 'lancedb', lancedb_table: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Table Name, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'nv-ingest', embedding_model: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Embedding Model, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'nvidia/llama-nemotron-embed-1b-v2', beir_loader: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Loader, description=None, choices=['vidore_hf'], min_val=None, max_val=None, hidden=False, placeholder=None)] = 'vidore_hf', beir_dataset_name: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Dataset Name, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=e.g. vidore_v3_computer_science)] = '', beir_split: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=BEIR Split, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = 'test', beir_query_language: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Query Language, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=Optional (e.g. en, fr))] = '', beir_doc_id_field: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=Doc ID Field, description=None, choices=['pdf_basename', 'pdf_page', 'source_id', 'path'], min_val=None, max_val=None, hidden=False, placeholder=None)] = 'pdf_basename', beir_ks: ~typing.Annotated[str, ~nemo_retriever.graph.designer.Param(label=K Values, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=1,3,5,10)] = '1,3,5,10', hybrid: ~typing.Annotated[bool, ~nemo_retriever.graph.designer.Param(label=Hybrid Search, description=None, choices=None, min_val=None, max_val=None, hidden=False, placeholder=None)] = False)[source]#
Bases:
objectDesigner BEIR evaluation node against an existing LanceDB table.
Assumes vectors were already written (for example via
IngestVdbOperatoror theretriever pipelineupload path). After evaluation, callsprint_run_summarylike the batch pipeline.
nemo_retriever.graph.content_operators module#
nemo_retriever.graph.content_transforms module#
nemo_retriever.graph.cpu_operator module#
Mixin flag for operators that run on CPU only.
- class nemo_retriever.graph.cpu_operator.CPUOperator[source]#
Bases:
objectMixin flag indicating an operator runs on CPU only.
Operators that perform no GPU work (file I/O, text splitting, DataFrame transforms) should inherit from both
AbstractOperatorand this class:class MyCPUActor(AbstractOperator, CPUOperator): ...
Executors can inspect
isinstance(op, CPUOperator)to skip GPU resource allocation for these stages.
nemo_retriever.graph.custom_operator module#
- class nemo_retriever.graph.custom_operator.UDFOperator(
- fn: Callable[[Any], Any],
- name: str | None = None,
- **kwargs: Any,
Bases:
AbstractOperatorA small operator wrapper for user-defined Python functions.
nemo_retriever.graph.designer module#
Decorator-driven registration of Designer components.
Usage:
from typing import Annotated
from nemo_retriever.graph.designer import designer_component, Param
@designer_component(
name="PDF Splitter",
category="Document Processing",
compute="cpu",
description="Splits multi-page PDFs into individual pages",
)
class PDFSplitActor(AbstractOperator):
def __init__(
self,
mode: Annotated[str, Param(label="Split Mode", choices=["page", "chapter"])] = "page",
):
...
- class nemo_retriever.graph.designer.Param(
- label: str | None = None,
- description: str | None = None,
- choices: list[Any] | None = None,
- min_val: float | None = None,
- max_val: float | None = None,
- hidden: bool = False,
- placeholder: str | None = None,
Bases:
objectParameter metadata for Designer UI, used with
typing.Annotated.- choices: list[Any] | None = None#
- description: str | None = None#
- label: str | None = None#
- max_val: float | None = None#
- min_val: float | None = None#
- placeholder: str | None = None#
- nemo_retriever.graph.designer.designer_component(
- *,
- name: str,
- category: str = 'General',
- compute: str = 'undefined',
- description: str = '',
- category_color: str | None = None,
- component_type: str | None = None,
Register a class or function as a Designer-visible component.
- Parameters:
name (str) – Friendly display name shown in the palette and on the canvas.
category (str) – Section heading in the component palette (e.g. “Document Processing”).
compute (str) – Resource hint:
"gpu","cpu", or"undefined".description (str) – Short description shown as a tooltip.
category_color (str | None) – Optional hex colour for the category section (e.g.
"#64b4ff").component_type (str | None) – Special type tag for code-generation routing (e.g.
"pipeline_sink","pipeline_evaluator"). Leave None for standardmap_batchesoperators.
nemo_retriever.graph.executor module#
Pipeline executors that run a Graph against input data.
- class nemo_retriever.graph.executor.AbstractExecutor(graph: Graph)[source]#
Bases:
ABCBase class for pipeline executors.
An executor takes a
Graphat init time and provides aningest()method that feeds data through the graph.
- class nemo_retriever.graph.executor.InprocessExecutor(
- graph: Graph,
- *,
- show_progress: bool = True,
Bases:
AbstractExecutorExecutor that runs a
Graphin-process on pandas DataFrames.No Ray dependency — each node’s operator is constructed once from
operator_class(**operator_kwargs)and called sequentially on the accumulated DataFrame.Only linear (single-root, no fan-out) graphs are currently supported.
- ingest(
- data: Any,
- **kwargs: Any,
Run the graph in-process on pandas DataFrames.
- Parameters:
data – A
pandas.DataFrame, a file path (str), or a list of file paths. When paths are provided, each file is read as raw bytes and combined into a single DataFrame withbytesandpathcolumns before being passed through the graph.- Returns:
The result after all operators have been applied.
- Return type:
pandas.DataFrame
- class nemo_retriever.graph.executor.RayDataExecutor(
- graph: Graph,
- *,
- ray_address: str | None = None,
- batch_size: int = 1,
- batch_format: str = 'pandas',
- num_cpus: float = 1,
- num_gpus: float = 0,
- node_overrides: Dict[str, Dict[str, Any]] | None = None,
Bases:
AbstractExecutorExecutor that builds a Ray Data pipeline from a
Graph.For each
Nodein the graph the executor appends amap_batchesstage that uses the node’soperator_classwithfn_constructor_kwargsfor deferred construction on Ray workers. This ensures heavy GPU models are loaded on workers, not serialised from the driver.The operator’s
__call__(defined onAbstractOperator) delegates torun(), so eachmap_batchesstage executes the full preprocess → process → postprocess pipeline.Only linear (single-root, no fan-out) graphs are currently supported.
- build_dataset(
- data: Any,
- **kwargs: Any,
Build a lazy Ray Data pipeline from the graph.
- Parameters:
data – Input to
ray.data.read_binary_files(a path or list of glob patterns) or an already-constructedray.data.Dataset.- Returns:
The lazy Ray dataset with all graph stages appended.
- Return type:
ray.data.Dataset
nemo_retriever.graph.file_loader_operator module#
Operators for loading file paths into DataFrames.
- class nemo_retriever.graph.file_loader_operator.FileListLoaderOperator(**kwargs: Any)[source]#
Bases:
AbstractOperator,CPUOperatorLoad a list of files into a DataFrame with
pathandbytescolumns.
nemo_retriever.graph.gpu_operator module#
Mixin flag for operators that require GPU resources.
- class nemo_retriever.graph.gpu_operator.GPUOperator[source]#
Bases:
objectMixin flag indicating an operator requires GPU resources.
Operators that load torch models or perform CUDA-based inference should inherit from both
AbstractOperatorand this class:class MyGPUActor(AbstractOperator, GPUOperator): ...
Executors can inspect
isinstance(op, GPUOperator)to allocate GPU resources or route work to GPU-capable workers.
nemo_retriever.graph.graph_pipeline_registry module#
Graph Pipeline Registry — manage, inspect, compare, and serialize golden pipeline graphs.
Provides a central GraphPipelineRegistry that stores named graph
blueprints (factory functions + metadata). Graphs built from the registry
can be inspected, diffed against each other, serialized to / loaded from JSON,
and configured with kwarg overrides — all without touching the code that
originally defined them.
A module-level default_registry is provided for convenience so that
graph definitions scattered across the codebase can all register to a single
shared instance.
Quick-start:
from nemo_retriever.graph.graph_pipeline_registry import default_registry
@default_registry.register("my-pipeline", description="Demo pipeline")
def _build():
from nemo_retriever.graph import Graph
return Graph() >> SomeOperator() >> AnotherOperator()
graph = default_registry.build("my-pipeline")
default_registry.print_graph("my-pipeline")
- class nemo_retriever.graph.graph_pipeline_registry.GraphBlueprint(name: str, graph_factory: ~typing.Callable[[], ~nemo_retriever.graph.pipeline_graph.Graph], description: str = '', version: str = '1.0.0', tags: ~typing.List[str] = <factory>, created_at: str = <factory>, updated_at: str = <factory>)[source]#
Bases:
objectA named, versioned graph definition held in the registry.
- created_at: str#
- description: str = ''#
- name: str#
- tags: List[str]#
- updated_at: str#
- version: str = '1.0.0'#
- class nemo_retriever.graph.graph_pipeline_registry.GraphDiff(identical: bool, structural_match: bool, node_count_a: int, node_count_b: int, roots_a: ~typing.List[str], roots_b: ~typing.List[str], node_diffs: ~typing.List[~nemo_retriever.graph.graph_pipeline_registry.NodeDiff] = <factory>, nodes_only_in_a: ~typing.List[str] = <factory>, nodes_only_in_b: ~typing.List[str] = <factory>)[source]#
Bases:
objectFull diff result between two graphs.
- identical: bool#
- node_count_a: int#
- node_count_b: int#
- nodes_only_in_a: List[str]#
- nodes_only_in_b: List[str]#
- roots_a: List[str]#
- roots_b: List[str]#
- structural_match: bool#
- class nemo_retriever.graph.graph_pipeline_registry.GraphPipelineRegistry[source]#
Bases:
objectCentral registry for golden pipeline graph definitions.
Stores
GraphBlueprintobjects keyed by name. Supports decorator and imperative registration, building fresh graph instances, inspection / pretty-printing, diffing between graphs, kwarg overrides, and JSON serialization / deserialization of the entire registry.Usage:
registry = GraphPipelineRegistry() @registry.register("my-pipeline", description="Demo", version="1.0") def _build(): return Graph() >> SomeOperator() >> AnotherOperator() graph = registry.build("my-pipeline") registry.print_graph("my-pipeline")
- build_with_overrides(
- name: str,
- overrides: Dict[str, Dict[str, Any]],
Build a graph and apply kwarg overrides to named nodes.
- Parameters:
name – Registered graph name.
overrides –
{node_name: {kwarg_key: new_value, ...}}— each matching node’soperator_kwargsare updated with the given values.
- diff(
- name_a: str,
- name_b: str,
Build both named graphs and return a
GraphDiff.
- get_blueprint(
- name: str,
Return the
GraphBlueprintfor name.Raises
KeyErrorif not found.
- get_graph_info(name: str) str[source]#
Return the full inspection report for a named graph as a string.
- list_blueprints(
- *,
- tag: str | None = None,
Return all blueprints, optionally filtered by tag.
- load_all(
- path: str | Path,
- *,
- overwrite: bool = False,
Load graphs from a JSON file produced by
save_all().Each loaded graph is registered as a factory that deserializes the stored structure. Returns the list of graph names loaded.
- load_graph(
- path: str | Path,
- *,
- name: str | None = None,
- overwrite: bool = False,
Load a single graph from a JSON file and register it.
If name is not provided, the blueprint name stored in the file is used (falls back to the file stem). Returns the registered name.
- print_diff(name_a: str, name_b: str) None[source]#
Print a human-readable diff between two registered graphs.
- print_graph(
- name: str,
- *,
- show_kwargs: bool = True,
Build and pretty-print the named graph with full details.
- register(
- name: str,
- *,
- description: str = '',
- version: str = '1.0.0',
- tags: List[str] | None = None,
- overwrite: bool = False,
Decorator that registers a graph factory function.
Example:
@registry.register("pdf-extract", description="PDF extraction pipeline") def _build(): return Graph() >> PDFSplitActor() >> PDFExtractionActor()
- register_graph(
- name: str,
- factory: Callable[[], Graph],
- *,
- description: str = '',
- version: str = '1.0.0',
- tags: List[str] | None = None,
- overwrite: bool = False,
Programmatically register a graph factory (non-decorator form).
- save_all(
- path: str | Path,
- *,
- indent: int = 2,
Serialize every registered graph to a single JSON file.
The file contains
{name: {roots, metadata, blueprint}}for each registered graph. Returns the resolved path.
- save_graph(
- name: str,
- path: str | Path,
- *,
- indent: int = 2,
Serialize a single named graph to a JSON file.
- unregister(
- name: str,
Remove and return the blueprint for name.
Raises
KeyErrorif name is not registered.
- class nemo_retriever.graph.graph_pipeline_registry.NodeDiff(
- position: str,
- node_a_name: str,
- node_b_name: str,
- name_changed: bool = False,
- class_changed: bool = False,
- class_a: str = '',
- class_b: str = '',
- kwargs_added: ~typing.Dict[str,
- ~typing.Any] = <factory>,
- kwargs_removed: ~typing.Dict[str,
- ~typing.Any] = <factory>,
- kwargs_changed: ~typing.Dict[str,
- ~typing.Tuple[~typing.Any,
- ~typing.Any]] = <factory>,
- children_a_only: ~typing.List[str] = <factory>,
- children_b_only: ~typing.List[str] = <factory>,
Bases:
objectDifferences between two nodes at corresponding positions.
- children_a_only: List[str]#
- children_b_only: List[str]#
- class_a: str = ''#
- class_b: str = ''#
- class_changed: bool = False#
- kwargs_added: Dict[str, Any]#
- kwargs_changed: Dict[str, Tuple[Any, Any]]#
- kwargs_removed: Dict[str, Any]#
- name_changed: bool = False#
- node_a_name: str#
- node_b_name: str#
- position: str#
- nemo_retriever.graph.graph_pipeline_registry.clone_graph(
- graph: Graph,
Create a structural deep-copy of graph by round-tripping through serialization.
This produces new
Node/ operator instances so modifications to the clone do not affect the original.
- nemo_retriever.graph.graph_pipeline_registry.collect_nodes(
- graph: Graph,
Return an ordered list of all unique nodes in the graph.
- nemo_retriever.graph.graph_pipeline_registry.deserialize_graph(
- data: dict,
Reconstruct a
Graphfrom a dict produced byserialize_graph().
- nemo_retriever.graph.graph_pipeline_registry.diff_graphs( ) GraphDiff[source]#
Compute a structural + configuration diff between two graphs.
Performs a parallel DFS walk and compares node names, operator classes, operator kwargs, and child topology at each corresponding position.
- nemo_retriever.graph.graph_pipeline_registry.find_node(
- graph: Graph,
- name: str,
Return the first node whose
namematches name, orNone.
- nemo_retriever.graph.graph_pipeline_registry.find_nodes(
- graph: Graph,
- name: str,
Return every node whose
namematches name.
- nemo_retriever.graph.graph_pipeline_registry.format_full_report(
- graph: Graph,
- *,
- show_kwargs: bool = True,
Return a complete inspection report: summary + tree + per-node details.
- nemo_retriever.graph.graph_pipeline_registry.format_graph_summary(
- graph: Graph,
Return a concise summary: node count, depth, root/leaf names.
- nemo_retriever.graph.graph_pipeline_registry.format_graph_tree(
- graph: Graph,
- *,
- show_kwargs: bool = False,
- show_class: bool = True,
- max_value_width: int = 120,
Return a human-readable tree representation of the graph.
- Parameters:
graph – The graph to format.
show_kwargs – Display each node’s
operator_kwargsbeneath it.show_class – Show the fully qualified operator class next to the node name.
max_value_width – Truncate kwarg value reprs longer than this.
- nemo_retriever.graph.graph_pipeline_registry.format_node_details(
- node: Node,
Return a detailed multi-line description of a single node.
- nemo_retriever.graph.graph_pipeline_registry.get_node_kwargs(
- graph: Graph,
- name: str,
Return the
operator_kwargsfor the first node named name.Raises
KeyErrorif no node matches.
- nemo_retriever.graph.graph_pipeline_registry.leaf_nodes(
- graph: Graph,
Return all leaf nodes (nodes with no children).
- nemo_retriever.graph.graph_pipeline_registry.list_all_kwargs(
- graph: Graph,
Return
{node_name: operator_kwargs}for every node in the graph.
- nemo_retriever.graph.graph_pipeline_registry.load_graph(
- path: str | Path,
Load a graph from a JSON file produced by
save_graph().
- nemo_retriever.graph.graph_pipeline_registry.max_depth(graph: Graph) int[source]#
Return the maximum depth (longest root-to-leaf path) of the graph.
- nemo_retriever.graph.graph_pipeline_registry.node_count(graph: Graph) int[source]#
Return the total number of unique nodes in the graph.
- nemo_retriever.graph.graph_pipeline_registry.print_diff( ) None[source]#
Print a human-readable diff between two graphs to stdout.
- nemo_retriever.graph.graph_pipeline_registry.print_graph(
- graph: Graph,
- *,
- show_kwargs: bool = True,
Print a full graph inspection to stdout.
- nemo_retriever.graph.graph_pipeline_registry.remove_node_kwargs(
- graph: Graph,
- node_name: str,
- keys: Sequence[str],
- *,
- all_matches: bool = False,
Remove specific kwarg keys from node(s) matching node_name.
Returns the number of nodes modified. Missing keys are silently ignored.
- nemo_retriever.graph.graph_pipeline_registry.replace_node_kwargs(
- graph: Graph,
- node_name: str,
- new_kwargs: Dict[str, Any],
- *,
- all_matches: bool = False,
Replace the entire
operator_kwargsdict for matching node(s).Returns the number of nodes modified.
- nemo_retriever.graph.graph_pipeline_registry.save_graph(
- graph: Graph,
- path: str | Path,
- *,
- indent: int = 2,
Serialize graph and write it to a JSON file at path.
Returns the resolved
Paththat was written.
- nemo_retriever.graph.graph_pipeline_registry.serialize_graph(
- graph: Graph,
Serialize a graph to a JSON-compatible dictionary.
The result can be passed to
json.dumps()(with the_RegistryJSONEncoder) and later restored viadeserialize_graph().
- nemo_retriever.graph.graph_pipeline_registry.update_node_kwargs(
- graph: Graph,
- node_name: str,
- updates: Dict[str, Any],
- *,
- all_matches: bool = False,
Update
operator_kwargsfor node(s) matching node_name in-place.- Parameters:
graph – The graph to modify.
node_name – Name of the target node(s).
updates –
{kwarg_key: new_value}pairs to merge in.all_matches – If
True, update every matching node. Otherwise update only the first match and raiseKeyErrorif none is found.
- Returns:
Number of nodes updated.
- Return type:
int
nemo_retriever.graph.ingestor_runtime module#
nemo_retriever.graph.lancedb_write_operator module#
Graph operator that appends embedding rows to a LanceDB table.
Designed for the service-mode ingest pipeline where pages arrive in
incremental batches. Unlike LanceDBWriterActor (which
overwrites on init), this operator lazily creates-or-opens the target
table and appends rows so that concurrent worker processes can all
write to the same LanceDB directory (LanceDB uses file-level locking
internally).
- class nemo_retriever.graph.lancedb_write_operator.LanceDBWriteOperator(
- *,
- uri: str = '/var/lib/nemo-retriever/lancedb',
- table_name: str = 'nv-ingest',
- hybrid: bool = False,
- embedding_column: str = 'text_embeddings_1b_v2',
- embedding_key: str = 'embedding',
- text_column: str = 'text',
- include_text: bool = True,
Bases:
AbstractOperator,CPUOperatorAppend embedding rows from a DataFrame batch to LanceDB.
The table is created on first write and appended to thereafter. The operator is side-effect-only: it returns the input DataFrame unmodified.
nemo_retriever.graph.multi_type_extract_operator module#
nemo_retriever.graph.operator_archetype module#
- class nemo_retriever.graph.operator_archetype.ArchetypeOperator(**kwargs: Any)[source]#
Bases:
AbstractOperatorLightweight graph-facing operator that resolves to a hardware-specific variant.
- classmethod cpu_variant_class() type[AbstractOperator] | None[source]#
- classmethod gpu_variant_class() type[AbstractOperator] | None[source]#
- classmethod resolve_operator_class(
- resources: ClusterResources | Resources | None = None,
- operator_kwargs: dict[str, Any] | None = None,
- classmethod variant_operator_kwargs(
- operator_class: type[AbstractOperator],
- operator_kwargs: dict[str, Any] | None = None,
Return constructor kwargs for the resolved concrete operator.
nemo_retriever.graph.operator_resolution module#
- nemo_retriever.graph.operator_resolution.resolve_graph(
- graph: Graph,
- resources: ClusterResources | Resources,
- nemo_retriever.graph.operator_resolution.resolve_graph_for_local_execution(
- graph: Graph,
- nemo_retriever.graph.operator_resolution.resolve_operator_class(
- operator_class: type[AbstractOperator],
- resources: ClusterResources | Resources,
- operator_kwargs: dict | None = None,
- nemo_retriever.graph.operator_resolution.resolve_operator_kwargs(
- operator_class: type[AbstractOperator],
- resolved_class: type[AbstractOperator],
- operator_kwargs: dict | None = None,
nemo_retriever.graph.pipeline_graph module#
Directed pipeline graph composed of Nodes that wrap AbstractOperators.
- class nemo_retriever.graph.pipeline_graph.Graph[source]#
Bases:
objectA directed acyclic pipeline graph.
A graph owns one or more root
Nodeinstances and can execute them in topological (depth-first) order.The
>>operator appends a node to the current leaf nodes and returns the graph itself, enabling fluent chaining:graph = a >> b >> c >> d # or graph = Graph() graph.add_root(a) graph >> b >> c >> d
Bare
AbstractOperatorinstances passed toadd_root(),add_chain(), or>>are auto-wrapped inNode.- add_chain(
- *nodes: Node | AbstractOperator,
Chain nodes in order and register the first as a root.
Each element may be a
Nodeor a bareAbstractOperator.
- add_root(
- node: Node | Graph | AbstractOperator,
Register node as a root (entry point) of the graph.
- execute(
- data: Any,
- **kwargs: Any,
Execute every root and its descendants depth-first.
Each root receives data; children receive their parent’s output. Returns a list of leaf outputs (one per leaf node reached).
- class nemo_retriever.graph.pipeline_graph.Node(
- operator: AbstractOperator,
- name: str | None = None,
- *,
- operator_class: type | None = None,
- operator_kwargs: dict | None = None,
Bases:
objectA single node in a pipeline graph.
Each node wraps an
AbstractOperatorand maintains an ordered list of child nodes that should execute after it.The
>>operator chains two nodes and returns aGraph:graph = a >> b >> c # Graph with root=a, a->b->c graph.add_root(...) # add more roots if needed
- add_child(
- child: Node | AbstractOperator,
Append child to this node’s children and return the child node.
nemo_retriever.graph.react_agent_operator module#
nemo_retriever.graph.recall_eval module#
nemo_retriever.graph.rrf_aggregator_operator module#
Operator that fuses per-step retrieval results using Reciprocal Rank Fusion.
- class nemo_retriever.graph.rrf_aggregator_operator.RRFAggregatorOperator(*, k: int = 60)[source]#
Bases:
AbstractOperator,CPUOperatorFuse multiple per-step ranked lists into a single ranking per query using RRF.
Implements the Reciprocal Rank Fusion formula
score(d) = sum(1 / (rank_i + k))across all retrieval steps where document d appears. This is the same formula used inretrieval_bench/nemo_agentic/utils.py:rrf_from_subquery_results.Designed to consume the output of
ReActAgentOperator(or any operator that emits one row per(query_id, step_idx, doc_id)triple) and produce a single fused ranking perquery_idsuitable as input toSelectionAgentOperator.Input DataFrame schema#
query_id : str — unique query identifier query_text : str — original query text (carried through) step_idx : int — which retrieval step produced this row (0, 1, 2 …) doc_id : str — retrieved document identifier text : str — document text content rank : int — 1-indexed rank within its step (1 = most relevant) (additional columns are ignored)
Output DataFrame schema#
query_id : str — same
query_idas the input query_text: str — original query text (first occurrence per query) doc_id : str — document identifier rrf_score : float — fused RRF score (higher = more relevant) text : str — document text (first occurrence perdoc_id) Rows are sorted byrrf_scoredescending within eachquery_id.- param k:
RRF damping factor. The standard value is
60(default). Larger values reduce the influence of top-ranked documents.- type k:
int
Examples
import pandas as pd from nemo_retriever.graph.rrf_aggregator_operator import RRFAggregatorOperator op = RRFAggregatorOperator(k=60) df = pd.DataFrame({ "query_id": ["q1", "q1", "q1", "q1"], "query_text": ["inflation causes"] * 4, "step_idx": [0, 0, 1, 1 ], "doc_id": ["d1", "d2", "d1", "d3"], "text": ["t1", "t2", "t1", "t3"], "rank": [1, 2, 1, 2 ], }) result = op.run(df) # d1 appears in both steps at rank 1 → highest RRF score
nemo_retriever.graph.selection_agent_operator module#
nemo_retriever.graph.store_operator module#
Graph operator for persisting post-embedding row images to storage.
- class nemo_retriever.graph.store_operator.StoreOperator(*, params: Any = None)[source]#
Bases:
AbstractOperator,CPUOperatorPersist row-level image payloads to local or object storage.
The operator consumes
_image_b64produced by content transforms and writes_stored_image_urifor downstream vector DB upload. By default it clears inline base64 after successful writes to avoid carrying page-sized payloads into VDB upload.
nemo_retriever.graph.subquery_operator module#
nemo_retriever.graph.tabular_fetch_embeddings_operator module#
Graph operator: fetch tabular entity descriptions from Neo4j into an embedding-ready DataFrame.
- class nemo_retriever.graph.tabular_fetch_embeddings_operator.TabularFetchEmbeddingsOp(*, database_name: str, **kwargs: Any)[source]#
Bases:
AbstractOperator,CPUOperatorFetch all tabular entity descriptions from Neo4j into an embedding-ready DataFrame.
This operator ignores its input — it always queries Neo4j directly and returns a fresh DataFrame with columns:
text,_embed_modality,path,page_number,metadata.The output schema matches the format produced by the unstructured pipeline, so the standard
_BatchEmbedActorcan be chained directly after this operator.
nemo_retriever.graph.tabular_schema_extract_operator module#
nemo_retriever.graph.webhook_operator module#
Graph operator for posting processed results to a webhook endpoint.
- class nemo_retriever.graph.webhook_operator.WebhookNotifyOperator(*, params: Any = None)[source]#
Bases:
AbstractOperator,CPUOperatorPost batch results to an external HTTP endpoint.
This is a side-effect-only operator: it sends a JSON payload to a remote URL but passes the incoming data through unmodified. If
endpoint_urlisNone(the default) the operator is a no-op.- Parameters:
params – A
WebhookParamsinstance. IfNoneorparams.endpoint_urlis falsy the stage does nothing.
Module contents#
Canonical graph-execution package for operators, graphs, and executors.
- class nemo_retriever.graph.AbstractExecutor(graph: Graph)[source]#
Bases:
ABCBase class for pipeline executors.
An executor takes a
Graphat init time and provides aningest()method that feeds data through the graph.
- class nemo_retriever.graph.AbstractOperator(**kwargs: Any)[source]#
Bases:
ABCBase class for all pipeline operators.
- class nemo_retriever.graph.ArchetypeOperator(**kwargs: Any)[source]#
Bases:
AbstractOperatorLightweight graph-facing operator that resolves to a hardware-specific variant.
- classmethod cpu_variant_class() type[AbstractOperator] | None[source]#
- classmethod gpu_variant_class() type[AbstractOperator] | None[source]#
- classmethod resolve_operator_class(
- resources: ClusterResources | Resources | None = None,
- operator_kwargs: dict[str, Any] | None = None,
- classmethod variant_operator_kwargs(
- operator_class: type[AbstractOperator],
- operator_kwargs: dict[str, Any] | None = None,
Return constructor kwargs for the resolved concrete operator.
- class nemo_retriever.graph.CPUOperator[source]#
Bases:
objectMixin flag indicating an operator runs on CPU only.
Operators that perform no GPU work (file I/O, text splitting, DataFrame transforms) should inherit from both
AbstractOperatorand this class:class MyCPUActor(AbstractOperator, CPUOperator): ...
Executors can inspect
isinstance(op, CPUOperator)to skip GPU resource allocation for these stages.
- class nemo_retriever.graph.FileListLoaderOperator(**kwargs: Any)[source]#
Bases:
AbstractOperator,CPUOperatorLoad a list of files into a DataFrame with
pathandbytescolumns.
- class nemo_retriever.graph.GPUOperator[source]#
Bases:
objectMixin flag indicating an operator requires GPU resources.
Operators that load torch models or perform CUDA-based inference should inherit from both
AbstractOperatorand this class:class MyGPUActor(AbstractOperator, GPUOperator): ...
Executors can inspect
isinstance(op, GPUOperator)to allocate GPU resources or route work to GPU-capable workers.
- class nemo_retriever.graph.Graph[source]#
Bases:
objectA directed acyclic pipeline graph.
A graph owns one or more root
Nodeinstances and can execute them in topological (depth-first) order.The
>>operator appends a node to the current leaf nodes and returns the graph itself, enabling fluent chaining:graph = a >> b >> c >> d # or graph = Graph() graph.add_root(a) graph >> b >> c >> d
Bare
AbstractOperatorinstances passed toadd_root(),add_chain(), or>>are auto-wrapped inNode.- add_chain(
- *nodes: Node | AbstractOperator,
Chain nodes in order and register the first as a root.
Each element may be a
Nodeor a bareAbstractOperator.
- add_root(
- node: Node | Graph | AbstractOperator,
Register node as a root (entry point) of the graph.
- execute(
- data: Any,
- **kwargs: Any,
Execute every root and its descendants depth-first.
Each root receives data; children receive their parent’s output. Returns a list of leaf outputs (one per leaf node reached).
- class nemo_retriever.graph.GraphPipelineRegistry[source]#
Bases:
objectCentral registry for golden pipeline graph definitions.
Stores
GraphBlueprintobjects keyed by name. Supports decorator and imperative registration, building fresh graph instances, inspection / pretty-printing, diffing between graphs, kwarg overrides, and JSON serialization / deserialization of the entire registry.Usage:
registry = GraphPipelineRegistry() @registry.register("my-pipeline", description="Demo", version="1.0") def _build(): return Graph() >> SomeOperator() >> AnotherOperator() graph = registry.build("my-pipeline") registry.print_graph("my-pipeline")
- build_with_overrides(
- name: str,
- overrides: Dict[str, Dict[str, Any]],
Build a graph and apply kwarg overrides to named nodes.
- Parameters:
name – Registered graph name.
overrides –
{node_name: {kwarg_key: new_value, ...}}— each matching node’soperator_kwargsare updated with the given values.
- diff(
- name_a: str,
- name_b: str,
Build both named graphs and return a
GraphDiff.
- get_blueprint(
- name: str,
Return the
GraphBlueprintfor name.Raises
KeyErrorif not found.
- get_graph_info(name: str) str[source]#
Return the full inspection report for a named graph as a string.
- list_blueprints(
- *,
- tag: str | None = None,
Return all blueprints, optionally filtered by tag.
- load_all(
- path: str | Path,
- *,
- overwrite: bool = False,
Load graphs from a JSON file produced by
save_all().Each loaded graph is registered as a factory that deserializes the stored structure. Returns the list of graph names loaded.
- load_graph(
- path: str | Path,
- *,
- name: str | None = None,
- overwrite: bool = False,
Load a single graph from a JSON file and register it.
If name is not provided, the blueprint name stored in the file is used (falls back to the file stem). Returns the registered name.
- print_diff(name_a: str, name_b: str) None[source]#
Print a human-readable diff between two registered graphs.
- print_graph(
- name: str,
- *,
- show_kwargs: bool = True,
Build and pretty-print the named graph with full details.
- register(
- name: str,
- *,
- description: str = '',
- version: str = '1.0.0',
- tags: List[str] | None = None,
- overwrite: bool = False,
Decorator that registers a graph factory function.
Example:
@registry.register("pdf-extract", description="PDF extraction pipeline") def _build(): return Graph() >> PDFSplitActor() >> PDFExtractionActor()
- register_graph(
- name: str,
- factory: Callable[[], Graph],
- *,
- description: str = '',
- version: str = '1.0.0',
- tags: List[str] | None = None,
- overwrite: bool = False,
Programmatically register a graph factory (non-decorator form).
- save_all(
- path: str | Path,
- *,
- indent: int = 2,
Serialize every registered graph to a single JSON file.
The file contains
{name: {roots, metadata, blueprint}}for each registered graph. Returns the resolved path.
- save_graph(
- name: str,
- path: str | Path,
- *,
- indent: int = 2,
Serialize a single named graph to a JSON file.
- unregister(
- name: str,
Remove and return the blueprint for name.
Raises
KeyErrorif name is not registered.
- class nemo_retriever.graph.InprocessExecutor(
- graph: Graph,
- *,
- show_progress: bool = True,
Bases:
AbstractExecutorExecutor that runs a
Graphin-process on pandas DataFrames.No Ray dependency — each node’s operator is constructed once from
operator_class(**operator_kwargs)and called sequentially on the accumulated DataFrame.Only linear (single-root, no fan-out) graphs are currently supported.
- ingest(
- data: Any,
- **kwargs: Any,
Run the graph in-process on pandas DataFrames.
- Parameters:
data – A
pandas.DataFrame, a file path (str), or a list of file paths. When paths are provided, each file is read as raw bytes and combined into a single DataFrame withbytesandpathcolumns before being passed through the graph.- Returns:
The result after all operators have been applied.
- Return type:
pandas.DataFrame
- class nemo_retriever.graph.Node(
- operator: AbstractOperator,
- name: str | None = None,
- *,
- operator_class: type | None = None,
- operator_kwargs: dict | None = None,
Bases:
objectA single node in a pipeline graph.
Each node wraps an
AbstractOperatorand maintains an ordered list of child nodes that should execute after it.The
>>operator chains two nodes and returns aGraph:graph = a >> b >> c # Graph with root=a, a->b->c graph.add_root(...) # add more roots if needed
- add_child(
- child: Node | AbstractOperator,
Append child to this node’s children and return the child node.
- class nemo_retriever.graph.RayDataExecutor(
- graph: Graph,
- *,
- ray_address: str | None = None,
- batch_size: int = 1,
- batch_format: str = 'pandas',
- num_cpus: float = 1,
- num_gpus: float = 0,
- node_overrides: Dict[str, Dict[str, Any]] | None = None,
Bases:
AbstractExecutorExecutor that builds a Ray Data pipeline from a
Graph.For each
Nodein the graph the executor appends amap_batchesstage that uses the node’soperator_classwithfn_constructor_kwargsfor deferred construction on Ray workers. This ensures heavy GPU models are loaded on workers, not serialised from the driver.The operator’s
__call__(defined onAbstractOperator) delegates torun(), so eachmap_batchesstage executes the full preprocess → process → postprocess pipeline.Only linear (single-root, no fan-out) graphs are currently supported.
- build_dataset(
- data: Any,
- **kwargs: Any,
Build a lazy Ray Data pipeline from the graph.
- Parameters:
data – Input to
ray.data.read_binary_files(a path or list of glob patterns) or an already-constructedray.data.Dataset.- Returns:
The lazy Ray dataset with all graph stages appended.
- Return type:
ray.data.Dataset
- class nemo_retriever.graph.StoreOperator(*, params: Any = None)[source]#
Bases:
AbstractOperator,CPUOperatorPersist row-level image payloads to local or object storage.
The operator consumes
_image_b64produced by content transforms and writes_stored_image_urifor downstream vector DB upload. By default it clears inline base64 after successful writes to avoid carrying page-sized payloads into VDB upload.
- class nemo_retriever.graph.UDFOperator(
- fn: Callable[[Any], Any],
- name: str | None = None,
- **kwargs: Any,
Bases:
AbstractOperatorA small operator wrapper for user-defined Python functions.
- class nemo_retriever.graph.WebhookNotifyOperator(*, params: Any = None)[source]#
Bases:
AbstractOperator,CPUOperatorPost batch results to an external HTTP endpoint.
This is a side-effect-only operator: it sends a JSON payload to a remote URL but passes the incoming data through unmodified. If
endpoint_urlisNone(the default) the operator is a no-op.- Parameters:
params – A
WebhookParamsinstance. IfNoneorparams.endpoint_urlis falsy the stage does nothing.