aiq.front_ends.fastapi.message_handler#

Attributes#

Classes#

Module Contents#

logger#
class MessageHandler(
websocket_reference: starlette.endpoints.WebSocketEndpoint,
)#
_websocket_reference: starlette.endpoints.WebSocketEndpoint#
_message_validator: aiq.front_ends.fastapi.message_validator.MessageValidator#
_messages_queue: asyncio.Queue[dict[str, str]]#
_out_going_messages_queue: asyncio.Queue[dict]#
_process_messages_task: asyncio.Task | None = None#
_process_out_going_messages_task: asyncio.Task = None#
_background_task: asyncio.Task = None#
_message_parent_id: str = 'default_id'#
_workflow_schema_type: str = None#
_user_interaction_response: asyncio.Future[aiq.data_models.api_server.TextContent]#
property messages_queue: asyncio.Queue[dict[str, str]]#
property background_task: asyncio.Task#
property process_messages_task: asyncio.Task | None#
property process_out_going_messages_task: asyncio.Task#
async process_messages() None#

Processes received messages from websocket and routes them appropriately.

async process_user_message_content(
user_content: aiq.data_models.api_server.WebSocketUserMessage | aiq.data_models.api_server.WebSocketUserInteractionResponseMessage,
) pydantic.BaseModel | None#

Processes the contents of a user message.

Parameters:

user_content – Incoming content data model.

Returns:

A validated Pydantic user content model or None if not found.

async process_user_message(
message_as_validated_type: aiq.data_models.api_server.WebSocketUserMessage,
) None#

Process user messages and routes them appropriately.

Parameters:

message_as_validated_type – A WebSocketUserMessage Data Model instance.

async create_websocket_message(
data_model: pydantic.BaseModel,
message_type: str | None = None,
status: str = WebSocketMessageStatus.IN_PROGRESS,
) None#

Creates a websocket message that will be ready for routing based on message type or data model.

Parameters:
  • data_model – Message content model.

  • message_type – Message content model.

  • status – Message content model.

async _on_process_stream_task_done(task: asyncio.Task) None#
async process_out_going_messages(websocket: fastapi.WebSocket) None#

Spawns out going message processing task.

Parameters:

websocket – Websocket instance.

async _process_response()#
async _pause_response()#
async __reset_user_interaction_response()#
async human_interaction(
prompt: aiq.data_models.interactive.InteractionPrompt,
) aiq.data_models.interactive.HumanResponse#

Registered human interaction callback that processes human interactions and returns responses from websocket connection.

Parameters:

prompt – Incoming interaction content data model.

Returns:

A Text Content Base Pydantic model.