aiq.front_ends.fastapi.message_handler#
Attributes#
Classes#
Module Contents#
- logger#
- class MessageHandler(
- websocket_reference: starlette.endpoints.WebSocketEndpoint,
- _websocket_reference: starlette.endpoints.WebSocketEndpoint#
- _message_validator: aiq.front_ends.fastapi.message_validator.MessageValidator#
- _messages_queue: asyncio.Queue[dict[str, str]]#
- _out_going_messages_queue: asyncio.Queue[dict]#
- _process_messages_task: asyncio.Task | None = None#
- _process_out_going_messages_task: asyncio.Task = None#
- _background_task: asyncio.Task = None#
- _user_interaction_response: asyncio.Future[aiq.data_models.api_server.TextContent]#
- property messages_queue: asyncio.Queue[dict[str, str]]#
- property background_task: asyncio.Task#
- property process_messages_task: asyncio.Task | None#
- property process_out_going_messages_task: asyncio.Task#
- async process_messages() None #
Processes received messages from websocket and routes them appropriately.
- async process_user_message_content(
- user_content: aiq.data_models.api_server.WebSocketUserMessage | aiq.data_models.api_server.WebSocketUserInteractionResponseMessage,
Processes the contents of a user message.
- Parameters:
user_content – Incoming content data model.
- Returns:
A validated Pydantic user content model or None if not found.
- async process_user_message(
- message_as_validated_type: aiq.data_models.api_server.WebSocketUserMessage,
Process user messages and routes them appropriately.
- Parameters:
message_as_validated_type – A WebSocketUserMessage Data Model instance.
- async create_websocket_message(
- data_model: pydantic.BaseModel,
- message_type: str | None = None,
- status: str = WebSocketMessageStatus.IN_PROGRESS,
Creates a websocket message that will be ready for routing based on message type or data model.
- Parameters:
data_model – Message content model.
message_type – Message content model.
status – Message content model.
- async _on_process_stream_task_done(task: asyncio.Task) None #
- async process_out_going_messages(websocket: fastapi.WebSocket) None #
Spawns out going message processing task.
- Parameters:
websocket – Websocket instance.
- async _process_response()#
- async _pause_response()#
- async __reset_user_interaction_response()#
- async human_interaction( ) aiq.data_models.interactive.HumanResponse #
Registered human interaction callback that processes human interactions and returns responses from websocket connection.
- Parameters:
prompt – Incoming interaction content data model.
- Returns:
A Text Content Base Pydantic model.