nat.front_ends.fastapi.message_handler#

Attributes#

Classes#

Module Contents#

logger#
class WebSocketMessageHandler(
socket: fastapi.WebSocket,
session_manager: nat.runtime.session.SessionManager,
step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor,
)#
_socket: fastapi.WebSocket#
_session_manager: nat.runtime.session.SessionManager#
_step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor#
_message_validator: nat.front_ends.fastapi.message_validator.MessageValidator#
_running_workflow_task: asyncio.Task | None = None#
_message_parent_id: str = 'default_id'#
_conversation_id: str | None = None#
_workflow_schema_type: str | None = None#
_user_interaction_response: asyncio.Future[nat.data_models.api_server.TextContent] | None = None#
_flow_handler: nat.authentication.interfaces.FlowHandlerBase | None = None#
_schema_output_mapping: dict[str, type[pydantic.BaseModel] | type[None]]#
set_flow_handler(
flow_handler: nat.authentication.interfaces.FlowHandlerBase,
) None#
async run() None#

Processes received messages from websocket and routes them appropriately.

_extract_last_user_message_content(
messages: list[nat.data_models.api_server.UserMessages],
) nat.data_models.api_server.TextContent#

Extracts the last user’s TextContent from a list of messages.

Args:

messages: List of UserMessages.

Returns:

TextContent object from the last user message.

Raises:

ValueError: If no user text content is found.

async _process_websocket_user_interaction_response_message(
user_content: nat.data_models.api_server.WebSocketUserInteractionResponseMessage,
) nat.data_models.api_server.TextContent#

Processes a WebSocketUserInteractionResponseMessage.

async _process_websocket_user_message(
user_content: nat.data_models.api_server.WebSocketUserMessage,
) nat.data_models.api_server.ChatRequest | str#

Processes a WebSocketUserMessage based on schema type.

async process_workflow_request(
user_message_as_validated_type: nat.data_models.api_server.WebSocketUserMessage,
) None#

Process user messages and routes them appropriately.

Args:

user_message_as_validated_type (WebSocketUserMessage): The validated user message to process.

async create_websocket_message(
data_model: pydantic.BaseModel,
message_type: str | None = None,
status: nat.data_models.api_server.WebSocketMessageStatus = WebSocketMessageStatus.IN_PROGRESS,
) None#

Creates a websocket message that will be ready for routing based on message type or data model.

Args:

data_model (BaseModel): Message content model. message_type (str | None): Message content model. status (WebSocketMessageStatus): Message content model.

async human_interaction_callback(
prompt: nat.data_models.interactive.InteractionPrompt,
) nat.data_models.interactive.HumanResponse#

Registered human interaction callback that processes human interactions and returns responses from websocket connection.

Args:

prompt: Incoming interaction content data model.

Returns:

A Text Content Base Pydantic model.

async _run_workflow(
payload: Any,
user_message_id: str | None = None,
conversation_id: str | None = None,
result_type: type | None = None,
output_type: type | None = None,
) None#