nat.builder.workflow#

Attributes#

Classes#

Workflow

Abstract base class providing core functionality for NAT functions.

Module Contents#

callback_handler_var: contextvars.ContextVar[Any | None]#
class Workflow(
*,
config: nat.data_models.config.Config,
entry_fn: nat.builder.function.Function[nat.builder.function_base.InputT, nat.builder.function_base.StreamingOutputT, nat.builder.function_base.SingleOutputT],
functions: dict[str, nat.builder.function.Function] | None = None,
llms: dict[str, nat.builder.llm.LLMProviderInfo] | None = None,
embeddings: dict[str, nat.builder.embedder.EmbedderProviderInfo] | None = None,
memory: dict[str, nat.memory.interfaces.MemoryEditor] | None = None,
object_stores: dict[str, nat.object_store.interfaces.ObjectStore] | None = None,
telemetry_exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter] | None = None,
retrievers: dict[str | None, nat.builder.retriever.RetrieverProviderInfo] | None = None,
ttc_strategies: dict[str, nat.experimental.test_time_compute.models.strategy_base.StrategyBase] | None = None,
context_state: nat.builder.context.ContextState,
)#

Bases: nat.builder.function_base.FunctionBase[nat.builder.function_base.InputT, nat.builder.function_base.StreamingOutputT, nat.builder.function_base.SingleOutputT]

Abstract base class providing core functionality for NAT functions.

This class provides type handling via generics, schema management for inputs and outputs, and type conversion capabilities.

Parameters#

InputTTypeVar

The input type for the function

StreamingOutputTTypeVar

The output type for streaming results

SingleOutputTTypeVar

The output type for single results

Notes#

FunctionBase is the foundation of the NAT function system, providing: - Type handling via generics - Schema management for inputs and outputs - Type conversion capabilities - Abstract interface that concrete function classes must implement

config#
functions#
llms#
embeddings#
memory#
telemetry_exporters#
object_stores#
retrievers#
_exporter_manager#
ttc_strategies#
_entry_fn#
_context_state#
property has_streaming_output: bool#

Check if this function supports streaming output.

Returns#

bool

True if the function supports streaming output, False otherwise

property has_single_output: bool#

Check if this function supports single output.

Returns#

bool

True if the function supports single output, False otherwise

async run(message: nat.builder.function_base.InputT)#

Called each time we start a new workflow run. We’ll create a new top-level workflow span here.

async result_with_steps(
message: nat.builder.function_base.InputT,
to_type: type | None = None,
)#
static from_entry_fn(
*,
config: nat.data_models.config.Config,
entry_fn: nat.builder.function.Function[nat.builder.function_base.InputT, nat.builder.function_base.StreamingOutputT, nat.builder.function_base.SingleOutputT],
functions: dict[str, nat.builder.function.Function] | None = None,
llms: dict[str, nat.builder.llm.LLMProviderInfo] | None = None,
embeddings: dict[str, nat.builder.embedder.EmbedderProviderInfo] | None = None,
memory: dict[str, nat.memory.interfaces.MemoryEditor] | None = None,
object_stores: dict[str, nat.object_store.interfaces.ObjectStore] | None = None,
telemetry_exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter] | None = None,
retrievers: dict[str | None, nat.builder.retriever.RetrieverProviderInfo] | None = None,
ttc_strategies: dict[str, nat.experimental.test_time_compute.models.strategy_base.StrategyBase] | None = None,
context_state: nat.builder.context.ContextState,
) Workflow[InputT, StreamingOutputT, SingleOutputT]#