nat.front_ends.fastapi.message_handler#

Attributes#

Classes#

Module Contents#

logger#
class WebSocketMessageHandler(
socket: fastapi.WebSocket,
session_manager: nat.runtime.session.SessionManager,
step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor,
)#
_socket: fastapi.WebSocket#
_session_manager: nat.runtime.session.SessionManager#
_step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor#
_message_validator: nat.front_ends.fastapi.message_validator.MessageValidator#
_running_workflow_task: asyncio.Task | None = None#
_message_parent_id: str = 'default_id'#
_conversation_id: str | None = None#
_workflow_schema_type: str = None#
_user_interaction_response: asyncio.Future[nat.data_models.interactive.HumanResponse] | None = None#
_flow_handler: nat.authentication.interfaces.FlowHandlerBase | None = None#
_schema_output_mapping: dict[str, type[pydantic.BaseModel] | None]#
set_flow_handler(
flow_handler: nat.authentication.interfaces.FlowHandlerBase,
) None#
async run() None#

Processes received messages from websocket and routes them appropriately.

async process_user_message_content(
user_content: nat.data_models.api_server.WebSocketUserMessage | nat.data_models.api_server.WebSocketUserInteractionResponseMessage,
) pydantic.BaseModel | None#

Processes the contents of a user message.

Parameters:

user_content – Incoming content data model.

Returns:

A validated Pydantic user content model or None if not found.

async process_workflow_request(
user_message_as_validated_type: nat.data_models.api_server.WebSocketUserMessage,
) None#

Process user messages and routes them appropriately.

Parameters:

user_message_as_validated_type – A WebSocketUserMessage Data Model instance.

async create_websocket_message(
data_model: pydantic.BaseModel,
message_type: str | None = None,
status: str = WebSocketMessageStatus.IN_PROGRESS,
) None#

Creates a websocket message that will be ready for routing based on message type or data model.

Parameters:
  • data_model – Message content model.

  • message_type – Message content model.

  • status – Message content model.

async human_interaction_callback(
prompt: nat.data_models.interactive.InteractionPrompt,
) nat.data_models.interactive.HumanResponse#

Registered human interaction callback that processes human interactions and returns responses from websocket connection.

Parameters:

prompt – Incoming interaction content data model.

Returns:

A Text Content Base Pydantic model.

async _run_workflow(
payload: Any,
conversation_id: str | None = None,
result_type: type | None = None,
output_type: type | None = None,
) None#