aiq.data_models.intermediate_step#

Classes#

IntermediateStepCategory

str(object='') -> str

IntermediateStepType

str(object='') -> str

IntermediateStepState

str(object='') -> str

StreamEventData

AIQStreamEventData is a data model that represents the data field in an streaming event.

UsageInfo

TraceMetadata

IntermediateStepPayload

AIQIntermediateStep is a data model that represents an intermediate step in the AgentIQ. Intermediate steps are

IntermediateStep

AIQIntermediateStep is a data model that represents an intermediate step in the AgentIQ. Intermediate steps are

Module Contents#

class IntermediateStepCategory#

Bases: str, enum.Enum

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Initialize self. See help(type(self)) for accurate signature.

LLM = 'LLM'#
TOOL = 'TOOL'#
WORKFLOW = 'WORKFLOW'#
TASK = 'TASK'#
FUNCTION = 'FUNCTION'#
CUSTOM = 'CUSTOM'#
class IntermediateStepType#

Bases: str, enum.Enum

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Initialize self. See help(type(self)) for accurate signature.

LLM_START = 'LLM_START'#
LLM_END = 'LLM_END'#
LLM_NEW_TOKEN = 'LLM_NEW_TOKEN'#
TOOL_START = 'TOOL_START'#
TOOL_END = 'TOOL_END'#
WORKFLOW_START = 'WORKFLOW_START'#
WORKFLOW_END = 'WORKFLOW_END'#
TASK_START = 'TASK_START'#
TASK_END = 'TASK_END'#
FUNCTION_START = 'FUNCTION_START'#
FUNCTION_END = 'FUNCTION_END'#
CUSTOM_START = 'CUSTOM_START'#
CUSTOM_END = 'CUSTOM_END'#
class IntermediateStepState#

Bases: str, enum.Enum

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.

Initialize self. See help(type(self)) for accurate signature.

START = 'START'#
CHUNK = 'CHUNK'#
END = 'END'#
class StreamEventData(/, **data: Any)#

Bases: pydantic.BaseModel

AIQStreamEventData is a data model that represents the data field in an streaming event.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

input: Any | None = None#
output: Any | None = None#
chunk: Any | None = None#
class UsageInfo(/, **data: Any)#

Bases: pydantic.BaseModel

token_usage: aiq.profiler.callbacks.token_usage_base_model.TokenUsageBaseModel#
num_llm_calls: int = 0#
seconds_between_calls: int = 0#
class TraceMetadata(/, **data: Any)#

Bases: pydantic.BaseModel

chat_responses: Any | None = None#
chat_inputs: Any | None = None#
tool_inputs: Any | None = None#
tool_outputs: Any | None = None#
tool_info: Any | None = None#
model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class IntermediateStepPayload(/, **data: Any)#

Bases: pydantic.BaseModel

AIQIntermediateStep is a data model that represents an intermediate step in the AgentIQ. Intermediate steps are captured while a request is running and can be used to show progress or to evaluate the path a workflow took to get a response.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

event_type: IntermediateStepType#
event_timestamp: float = None#
span_event_timestamp: float | None = None#
framework: aiq.builder.framework_enum.LLMFrameworkEnum | None = None#
name: str | None = None#
tags: list[str] | None = None#
metadata: dict[str, Any] | TraceMetadata | None = None#
data: StreamEventData | None = None#
usage_info: UsageInfo | None = None#
UUID: str = None#
property event_category: IntermediateStepCategory#
property event_state: IntermediateStepState#
check_span_event_timestamp() IntermediateStepPayload#
class IntermediateStep(/, **data: Any)#

Bases: pydantic.BaseModel

AIQIntermediateStep is a data model that represents an intermediate step in the AgentIQ. Intermediate steps are captured while a request is running and can be used to show progress or to evaluate the path a workflow took to get a response.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

function_ancestry: aiq.data_models.invocation_node.InvocationNode | None#
payload: IntermediateStepPayload#
property event_type: IntermediateStepType#
property event_timestamp: float#
property span_event_timestamp: float | None#
property framework: aiq.builder.framework_enum.LLMFrameworkEnum | None#
property name: str | None#
property tags: list[str] | None#
property metadata: dict[str, Any] | TraceMetadata | None#
property data: StreamEventData | None#
property usage_info: UsageInfo | None#
property UUID: str#
property event_category: IntermediateStepCategory#
property event_state: IntermediateStepState#