nat.front_ends.fastapi.routes.common_utils#

Shared FastAPI route helpers for HTTP generate/chat endpoints.

Attributes#

Functions#

add_context_headers_to_response(→ None)

Add context-based headers to response if available.

_build_interactive_runner(worker, session_manager)

_with_annotation(handler, param_name, annotation)

get_single_endpoint(*, worker, session_manager, ...)

Build a single-response GET handler.

get_streaming_endpoint(*, worker, session_manager, ...)

Build a streaming GET handler.

post_single_endpoint(*, worker, session_manager, ...)

Build a single-response POST handler.

post_streaming_endpoint(*, worker, session_manager, ...)

Build a streaming POST handler.

Module Contents#

logger#
RESPONSE_500#
add_context_headers_to_response(response: fastapi.Response) None#

Add context-based headers to response if available.

_build_interactive_runner(
worker: Any,
session_manager: nat.runtime.session.SessionManager,
)#
_with_annotation(handler: Any, param_name: str, annotation: Any)#
get_single_endpoint(
*,
worker: Any,
session_manager: nat.runtime.session.SessionManager,
result_type: type | None,
)#

Build a single-response GET handler.

get_streaming_endpoint(
*,
worker: Any,
session_manager: nat.runtime.session.SessionManager,
streaming: bool,
result_type: type | None,
output_type: type | None,
)#

Build a streaming GET handler.

post_single_endpoint(
*,
worker: Any,
session_manager: nat.runtime.session.SessionManager,
request_type: Any,
enable_interactive: bool,
result_type: type | None,
)#

Build a single-response POST handler.

post_streaming_endpoint(
*,
worker: Any,
session_manager: nat.runtime.session.SessionManager,
request_type: Any,
enable_interactive: bool,
streaming: bool,
result_type: type | None,
output_type: type | None,
)#

Build a streaming POST handler.