nat.control_flow.sequential_executor#

Attributes#

Classes#

ToolExecutionConfig

Configuration for individual tool execution within sequential execution.

SequentialExecutorConfig

Configuration for sequential execution of a list of functions.

Functions#

Module Contents#

logger#
class ToolExecutionConfig(/, **data: Any)#

Bases: pydantic.BaseModel

Configuration for individual tool execution within sequential execution.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

use_streaming: bool = None#
class SequentialExecutorConfig(/, **data: Any)#

Bases: nat.data_models.function.FunctionBaseConfig

Configuration for sequential execution of a list of functions.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

tool_list: list[nat.data_models.component_ref.FunctionRef] = None#
tool_execution_config: dict[str, ToolExecutionConfig] = None#
raise_type_incompatibility: bool = None#
_get_function_output_type(
function: nat.builder.function.Function,
tool_execution_config: dict[str, ToolExecutionConfig],
) type#
_validate_function_type_compatibility(
src_fn: nat.builder.function.Function,
target_fn: nat.builder.function.Function,
tool_execution_config: dict[str, ToolExecutionConfig],
) None#
async _validate_tool_list_type_compatibility(
sequential_executor_config: SequentialExecutorConfig,
builder: nat.builder.builder.Builder,
) tuple[type, type]#
async sequential_execution(
config: SequentialExecutorConfig,
builder: nat.builder.builder.Builder,
)#