aiq.front_ends.fastapi.websocket#
Attributes#
Classes#
Module Contents#
- logger#
- class AIQWebSocket(
- session_manager: aiq.runtime.session.AIQSessionManager,
- step_adaptor: aiq.front_ends.fastapi.step_adaptor.StepAdaptor,
- *args,
- **kwargs,
Bases:
starlette.endpoints.WebSocketEndpoint
- encoding = 'json'#
- _session_manager: aiq.runtime.session.AIQSessionManager#
- _message_handler: aiq.front_ends.fastapi.message_handler.MessageHandler#
- _process_response_event: asyncio.Event#
- _workflow_schema_type: dict[str, collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]]#
- _step_adaptor#
- property workflow_schema_type: dict[str, collections.abc.Callable[Ellipsis, collections.abc.Awaitable[Any]]]#
- property process_response_event: asyncio.Event#
- async on_connect(websocket: fastapi.WebSocket)#
Override to handle an incoming websocket connection
- async on_receive(websocket: fastapi.WebSocket, data: dict[str, Any])#
Override to handle an incoming websocket message
- async on_disconnect(websocket: fastapi.WebSocket, close_code: Any)#
Override to handle a disconnecting websocket
- async process_chat_stream( )#
- async process_generate(payload: Any)#
- async process_chat(payload: aiq.data_models.api_server.AIQChatRequest)#