nat.builder.workflow#
Attributes#
Classes#
Abstract base class providing core functionality for NAT functions. |
Module Contents#
- callback_handler_var: contextvars.ContextVar[Any | None]#
- class Workflow(
- *,
- config: nat.data_models.config.Config,
- entry_fn: nat.builder.function.Function[nat.builder.function_base.InputT, nat.builder.function_base.StreamingOutputT, nat.builder.function_base.SingleOutputT],
- functions: dict[str, nat.builder.function.Function] | None = None,
- llms: dict[str, nat.builder.llm.LLMProviderInfo] | None = None,
- embeddings: dict[str, nat.builder.embedder.EmbedderProviderInfo] | None = None,
- memory: dict[str, nat.memory.interfaces.MemoryEditor] | None = None,
- object_stores: dict[str, nat.object_store.interfaces.ObjectStore] | None = None,
- telemetry_exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter] | None = None,
- retrievers: dict[str | None, nat.builder.retriever.RetrieverProviderInfo] | None = None,
- ttc_strategies: dict[str, nat.experimental.test_time_compute.models.strategy_base.StrategyBase] | None = None,
- context_state: nat.builder.context.ContextState,
Bases:
nat.builder.function_base.FunctionBase
[nat.builder.function_base.InputT
,nat.builder.function_base.StreamingOutputT
,nat.builder.function_base.SingleOutputT
]Abstract base class providing core functionality for NAT functions.
This class provides type handling via generics, schema management for inputs and outputs, and type conversion capabilities.
Parameters#
- InputTTypeVar
The input type for the function
- StreamingOutputTTypeVar
The output type for streaming results
- SingleOutputTTypeVar
The output type for single results
Notes#
FunctionBase is the foundation of the NAT function system, providing: - Type handling via generics - Schema management for inputs and outputs - Type conversion capabilities - Abstract interface that concrete function classes must implement
- config#
- functions#
- llms#
- embeddings#
- memory#
- telemetry_exporters#
- object_stores#
- retrievers#
- _exporter_manager#
- ttc_strategies#
- _entry_fn#
- _context_state#
- property has_streaming_output: bool#
Check if this function supports streaming output.
Returns#
- bool
True if the function supports streaming output, False otherwise
- property has_single_output: bool#
Check if this function supports single output.
Returns#
- bool
True if the function supports single output, False otherwise
- async run(message: nat.builder.function_base.InputT)#
Called each time we start a new workflow run. We’ll create a new top-level workflow span here.
- static from_entry_fn(
- *,
- config: nat.data_models.config.Config,
- entry_fn: nat.builder.function.Function[nat.builder.function_base.InputT, nat.builder.function_base.StreamingOutputT, nat.builder.function_base.SingleOutputT],
- functions: dict[str, nat.builder.function.Function] | None = None,
- llms: dict[str, nat.builder.llm.LLMProviderInfo] | None = None,
- embeddings: dict[str, nat.builder.embedder.EmbedderProviderInfo] | None = None,
- memory: dict[str, nat.memory.interfaces.MemoryEditor] | None = None,
- object_stores: dict[str, nat.object_store.interfaces.ObjectStore] | None = None,
- telemetry_exporters: dict[str, nat.observability.exporter.base_exporter.BaseExporter] | None = None,
- retrievers: dict[str | None, nat.builder.retriever.RetrieverProviderInfo] | None = None,
- ttc_strategies: dict[str, nat.experimental.test_time_compute.models.strategy_base.StrategyBase] | None = None,
- context_state: nat.builder.context.ContextState,