aiq.front_ends.fastapi.websocket#

Attributes#

Classes#

Module Contents#

logger#
class AIQWebSocket(
session_manager: aiq.runtime.session.AIQSessionManager,
step_adaptor: aiq.front_ends.fastapi.step_adaptor.StepAdaptor,
*args,
**kwargs,
)#

Bases: starlette.endpoints.WebSocketEndpoint

encoding = 'json'#
_session_manager: aiq.runtime.session.AIQSessionManager#
_message_handler: aiq.front_ends.fastapi.message_handler.MessageHandler#
_process_response_event: asyncio.Event#
_workflow_schema_type: dict[str, collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]]#
_step_adaptor#
property workflow_schema_type: dict[str, collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]]#
property process_response_event: asyncio.Event#
async on_connect(websocket: fastapi.WebSocket)#

Override to handle an incoming websocket connection

async on_send(websocket: fastapi.WebSocket, data: dict[str, str])#
async on_receive(websocket: fastapi.WebSocket, data: dict[str, Any])#

Override to handle an incoming websocket message

async on_disconnect(websocket: fastapi.WebSocket, close_code: Any)#

Override to handle a disconnecting websocket

async _process_message(
payload: Any,
result_type: type | None = None,
output_type: type | None = None,
) None#
async process_generate_stream(payload: str)#
async process_chat_stream(
payload: aiq.data_models.api_server.AIQChatRequest,
)#
async process_generate(payload: Any)#
async process_chat(payload: aiq.data_models.api_server.AIQChatRequest)#