nat.front_ends.fastapi.routes.generate#

Generate route registration and handler factories.

Attributes#

Classes#

_GenerateEndpointType

Enum where members are also (and must be) strings

_GenerateEndpointMethod

Enum where members are also (and must be) strings

Functions#

get_streaming_raw_endpoint(*, session_manager, ...)

Build a raw-streaming GET handler.

post_streaming_raw_endpoint(*, worker, ...)

Build a raw-streaming POST handler.

_response_for_endpoint_type(→ type | None)

add_generate_route(worker, app, session_manager, *, ...)

Add a generate route for an endpoint.

add_generate_routes(worker, app, endpoint, ...[, ...])

Module Contents#

logger#
get_streaming_raw_endpoint(
*,
session_manager: nat.runtime.session.SessionManager,
streaming: bool,
result_type: type | None,
output_type: type | None,
)#

Build a raw-streaming GET handler.

post_streaming_raw_endpoint(
*,
worker: Any,
session_manager: nat.runtime.session.SessionManager,
request_type: Any,
enable_interactive: bool,
streaming: bool,
result_type: type | None,
output_type: type | None,
)#

Build a raw-streaming POST handler.

class _GenerateEndpointType#

Bases: enum.StrEnum

Enum where members are also (and must be) strings

Initialize self. See help(type(self)) for accurate signature.

SINGLE = 'single'#
STREAMING = 'streaming'#
FULL = 'full'#
class _GenerateEndpointMethod#

Bases: enum.StrEnum

Enum where members are also (and must be) strings

Initialize self. See help(type(self)) for accurate signature.

GET = 'GET'#
POST = 'POST'#
_response_for_endpoint_type(
session_manager: nat.runtime.session.SessionManager,
endpoint_type: _GenerateEndpointType,
) type | None#
async add_generate_route(
worker: Any,
app: fastapi.FastAPI,
session_manager: nat.runtime.session.SessionManager,
*,
enable_interactive: bool,
endpoint_path: str,
endpoint_type: _GenerateEndpointType,
endpoint_method: _GenerateEndpointMethod,
)#

Add a generate route for an endpoint.

async add_generate_routes(
worker: Any,
app: fastapi.FastAPI,
endpoint: Any,
session_manager: nat.runtime.session.SessionManager,
*,
enable_interactive: bool = True,
disable_legacy_routes: bool = False,
)#