nat.control_flow.sequential_executor#

Attributes#

Exceptions#

SequentialExecutorExit

Raised when a tool wants to exit the sequential executor chain early with a custom message.

Classes#

ToolExecutionConfig

Configuration for individual tool execution within sequential execution.

SequentialExecutorConfig

Configuration for sequential execution of a list of functions.

Functions#

Module Contents#

logger#
exception SequentialExecutorExit(message: str)#

Bases: Exception

Raised when a tool wants to exit the sequential executor chain early with a custom message.

Initialize self. See help(type(self)) for accurate signature.

message#
class ToolExecutionConfig(/, **data: Any)#

Bases: pydantic.BaseModel

Configuration for individual tool execution within sequential execution.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

use_streaming: bool = None#
class SequentialExecutorConfig(/, **data: Any)#

Bases: nat.data_models.function.FunctionBaseConfig

Configuration for sequential execution of a list of functions.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

description: str = None#
tool_list: list[nat.data_models.component_ref.FunctionRef] = None#
tool_execution_config: dict[str, ToolExecutionConfig] = None#
raise_type_incompatibility: bool = None#
return_error_on_exception: bool = None#
_get_function_output_type(
function: nat.builder.function.Function,
tool_execution_config: dict[str, ToolExecutionConfig],
) type#
_validate_function_type_compatibility(
src_fn: nat.builder.function.Function,
target_fn: nat.builder.function.Function,
tool_execution_config: dict[str, ToolExecutionConfig],
) None#
async _validate_tool_list_type_compatibility(
sequential_executor_config: SequentialExecutorConfig,
builder: nat.builder.builder.Builder,
) tuple[type, type]#
async sequential_execution(
config: SequentialExecutorConfig,
builder: nat.builder.builder.Builder,
)#