aiq.builder.workflow#

Attributes#

Classes#

Workflow

Abstract base class providing core functionality for AgentIQ functions.

Module Contents#

callback_handler_var: contextvars.ContextVar[Any | None]#
class Workflow(
*,
config: aiq.data_models.config.AIQConfig,
entry_fn: aiq.builder.function.Function[aiq.builder.function_base.InputT, aiq.builder.function_base.StreamingOutputT, aiq.builder.function_base.SingleOutputT],
functions: dict[str, aiq.builder.function.Function] | None = None,
llms: dict[str, aiq.builder.llm.LLMProviderInfo] | None = None,
embeddings: dict[str, aiq.builder.embedder.EmbedderProviderInfo] | None = None,
memory: dict[str, aiq.memory.interfaces.MemoryEditor] | None = None,
exporters: dict[str, opentelemetry.sdk.trace.export.SpanExporter] | None = None,
retrievers: dict[str | None, aiq.builder.retriever.RetrieverProviderInfo] | None = None,
context_state: aiq.builder.context.AIQContextState,
)#

Bases: aiq.builder.function_base.FunctionBase[aiq.builder.function_base.InputT, aiq.builder.function_base.StreamingOutputT, aiq.builder.function_base.SingleOutputT]

Abstract base class providing core functionality for AgentIQ functions.

This class provides type handling via generics, schema management for inputs and outputs, and type conversion capabilities.

Parameters#

InputTTypeVar

The input type for the function

StreamingOutputTTypeVar

The output type for streaming results

SingleOutputTTypeVar

The output type for single results

Notes#

FunctionBase is the foundation of the AgentIQ function system, providing: - Type handling via generics - Schema management for inputs and outputs - Type conversion capabilities - Abstract interface that concrete function classes must implement

config#
functions#
llms#
embeddings#
memory#
retrievers#
_entry_fn#
_context_state#
_exporters#
property has_streaming_output: bool#

Check if this function supports streaming output.

Returns#

bool

True if the function supports streaming output, False otherwise

property has_single_output: bool#

Check if this function supports single output.

Returns#

bool

True if the function supports single output, False otherwise

async run(message: aiq.builder.function_base.InputT)#

Called each time we start a new workflow run. We’ll create a new top-level workflow span here.

async result_with_steps(
message: aiq.builder.function_base.InputT,
to_type: type | None = None,
)#
static from_entry_fn(
*,
config: aiq.data_models.config.AIQConfig,
entry_fn: aiq.builder.function.Function[aiq.builder.function_base.InputT, aiq.builder.function_base.StreamingOutputT, aiq.builder.function_base.SingleOutputT],
functions: dict[str, aiq.builder.function.Function] | None = None,
llms: dict[str, aiq.builder.llm.LLMProviderInfo] | None = None,
embeddings: dict[str, aiq.builder.embedder.EmbedderProviderInfo] | None = None,
memory: dict[str, aiq.memory.interfaces.MemoryEditor] | None = None,
exporters: dict[str, opentelemetry.sdk.trace.export.SpanExporter] | None = None,
retrievers: dict[str | None, aiq.builder.retriever.RetrieverProviderInfo] | None = None,
context_state: aiq.builder.context.AIQContextState,
) Workflow[InputT, StreamingOutputT, SingleOutputT]#