aiq.front_ends.fastapi.message_validator#

Attributes#

Classes#

Module Contents#

logger#
class MessageValidator#
_message_type_schema_mapping: dict[str, type[pydantic.BaseModel]]#
_data_type_schema_mapping: dict[str, type[pydantic.BaseModel]]#
_message_parent_id: str = 'default_id'#
async validate_message(message: dict[str, Any]) pydantic.BaseModel#

Validates an incoming WebSocket message against its expected schema. If validation fails, returns a system response error message.

Parameters:

message – Incoming WebSocket message as a dictionary.

Returns:

A validated Pydantic model.

async get_message_schema_by_type(
message_type: str,
) type[pydantic.BaseModel]#

Retrieves the corresponding Pydantic model schema based on the message type.

Parameters:

message_type – The type of message as a string.

Returns:

A Pydantic schema class if found, otherwise None.

async convert_data_to_message_content(
data_model: pydantic.BaseModel,
) pydantic.BaseModel#

Converts a Pydantic data model to a WebSocket message content instance.

Parameters:

data_model – Pydantic Data Model instance.

Returns:

A WebSocket Message Content Data Model instance.

async convert_text_content_to_human_response(
text_content: aiq.data_models.api_server.TextContent,
human_prompt: aiq.data_models.interactive.HumanPromptBase,
) aiq.data_models.interactive.HumanResponse#

Converts Message Text Content data model to a Human Response Base data model instance.

Parameters:
  • text_content – Pydantic TextContent Data Model instance.

  • human_prompt – Pydantic HumanPrompt Data Model instance.

Returns:

A Human Response Data Model instance.

async resolve_message_type_by_data(data_model: pydantic.BaseModel) str#

Resolve message type from a validated model

Parameters:

data_model – Pydantic Data Model instance.

Returns:

A WebSocket Message Content Data Model instance.

async get_intermediate_step_parent_id(
data_model: aiq.data_models.api_server.AIQResponseIntermediateStep,
) str#

Retrieves intermediate step parent_id from AIQResponseIntermediateStep instance.

Parameters:

data_model – AIQResponseIntermediateStep Data Model instance.

Returns:

Intermediate step parent_id or “default”.

async create_system_response_token_message(
message_type: Literal[aiq.data_models.api_server.WebSocketMessageType.RESPONSE_MESSAGE, aiq.data_models.api_server.WebSocketMessageType.ERROR_MESSAGE] = WebSocketMessageType.RESPONSE_MESSAGE,
message_id: str | None = str(uuid.uuid4()),
thread_id: str = 'default',
parent_id: str = 'default',
content: aiq.data_models.api_server.SystemResponseContent | aiq.data_models.api_server.Error = SystemResponseContent(),
status: aiq.data_models.api_server.WebSocketMessageStatus = WebSocketMessageStatus.IN_PROGRESS,
timestamp: str = str(datetime.datetime.now(datetime.timezone.utc)),
) aiq.data_models.api_server.WebSocketSystemResponseTokenMessage | None#

Creates a system response token message with default values.

Parameters:
  • message_type – Type of WebSocket message.

  • message_id – Unique identifier for the message (default: generated UUID).

  • thread_id – ID of the thread the message belongs to (default: “default”).

  • parent_id – ID of the user message that spawned child messages.

  • content – Message content.

  • status – Status of the message (default: IN_PROGRESS).

  • timestamp – Timestamp of the message (default: current UTC time).

Returns:

A WebSocketSystemResponseTokenMessage instance.

async create_system_intermediate_step_message(
message_type: Literal[aiq.data_models.api_server.WebSocketMessageType.INTERMEDIATE_STEP_MESSAGE] = WebSocketMessageType.INTERMEDIATE_STEP_MESSAGE,
message_id: str = str(uuid.uuid4()),
thread_id: str = 'default',
parent_id: str = 'default',
content: aiq.data_models.api_server.SystemIntermediateStepContent = SystemIntermediateStepContent(name='default', payload='default'),
status: aiq.data_models.api_server.WebSocketMessageStatus = WebSocketMessageStatus.IN_PROGRESS,
timestamp: str = str(datetime.datetime.now(datetime.timezone.utc)),
) aiq.data_models.api_server.WebSocketSystemIntermediateStepMessage | None#

Creates a system intermediate step message with default values.

Parameters:
  • message_type – Type of WebSocket message.

  • message_id – Unique identifier for the message (default: generated UUID).

  • thread_id – ID of the thread the message belongs to (default: “default”).

  • parent_id – ID of the user message that spawned child messages.

  • content – Message content

  • status – Status of the message (default: IN_PROGRESS).

  • timestamp – Timestamp of the message (default: current UTC time).

Returns:

A WebSocketSystemIntermediateStepMessage instance.

async create_system_interaction_message(
*,
message_type: Literal[aiq.data_models.api_server.WebSocketMessageType.SYSTEM_INTERACTION_MESSAGE] = WebSocketMessageType.SYSTEM_INTERACTION_MESSAGE,
message_id: str | None = str(uuid.uuid4()),
thread_id: str = 'default',
parent_id: str = 'default',
content: aiq.data_models.interactive.HumanPrompt,
status: aiq.data_models.api_server.WebSocketMessageStatus = WebSocketMessageStatus.IN_PROGRESS,
timestamp: str = str(datetime.datetime.now(datetime.timezone.utc)),
) aiq.data_models.api_server.WebSocketSystemInteractionMessage | None#

Creates a system interaction message with default values.

Parameters:
  • message_type – Type of WebSocket message.

  • message_id – Unique identifier for the message (default: generated UUID).

  • thread_id – ID of the thread the message belongs to (default: “default”).

  • parent_id – ID of the user message that spawned child messages.

  • content – Message content

  • status – Status of the message (default: IN_PROGRESS).

  • timestamp – Timestamp of the message (default: current UTC time).

Returns:

A WebSocketSystemInteractionMessage instance.