nat.middleware.utils.workflow_inventory#

Attributes#

Classes#

DiscoveredBase

Base class for discovered workflow items.

DiscoveredComponent

Information about a discovered component and its available functions.

DiscoveredFunction

Information about a discovered workflow function.

RegisteredCallableBase

Base class for registered callables.

RegisteredFunction

A workflow function registered for middleware interception.

RegisteredComponentMethod

A component method registered for middleware interception.

WorkflowInventory

Inventory of discovered components and functions.

Module Contents#

COMPONENT_FUNCTION_ALLOWLISTS: dict[nat.data_models.component.ComponentGroup, set[str]]#
class DiscoveredBase(/, **data: Any)#

Bases: pydantic.BaseModel

Base class for discovered workflow items.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str = None#
instance: Any = None#
config: Any = None#
class DiscoveredComponent(/, **data: Any)#

Bases: DiscoveredBase

Information about a discovered component and its available functions.

Attributes:

name: Component name (e.g., “gpt4”, “milvus”) component_type: Component type instance: Component instance config: Component configuration callable_functions: A set of callable component function names on the instance

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

component_type: nat.data_models.component.ComponentGroup = None#
callable_functions: set[str] = None#
class DiscoveredFunction(/, **data: Any)#

Bases: pydantic.BaseModel

Information about a discovered workflow function.

Attributes:

name: Function name (e.g., “my_api_handler”) config: Function configuration instance: Function instance

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str = None#
config: nat.data_models.function.FunctionBaseConfig = None#
instance: nat.builder.function.Function = None#
class RegisteredCallableBase(/, **data: Any)#

Bases: pydantic.BaseModel

Base class for registered callables.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

key: str = None#
class RegisteredFunction(/, **data: Any)#

Bases: RegisteredCallableBase

A workflow function registered for middleware interception.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

function_instance: nat.builder.function.Function = None#
class RegisteredComponentMethod(/, **data: Any)#

Bases: RegisteredCallableBase

A component method registered for middleware interception.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

component_instance: Any = None#
function_name: str = None#
original_callable: collections.abc.Callable = None#
class WorkflowInventory(/, **data: Any)#

Bases: pydantic.BaseModel

Inventory of discovered components and functions.

This container holds all components and functions discovered from the workflow that are available for registration but not explicitly configured in the middleware config. It provides a structured view of everything that can be intercepted.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

llms: list[DiscoveredComponent] = None#
embedders: list[DiscoveredComponent] = None#
retrievers: list[DiscoveredComponent] = None#
memory: list[DiscoveredComponent] = None#
object_stores: list[DiscoveredComponent] = None#
auth_providers: list[DiscoveredComponent] = None#
workflow_functions: list[DiscoveredFunction] = None#