nat.front_ends.fastapi.fastapi_front_end_plugin_worker#

Attributes#

Classes#

FastApiFrontEndPluginWorkerBase

Helper class that provides a standard way to create an ABC using

RouteInfo

FastApiFrontEndPluginWorker

Helper class that provides a standard way to create an ABC using

Module Contents#

logger#
class FastApiFrontEndPluginWorkerBase(config: nat.data_models.config.Config)#

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

_config#
_front_end_config#
_cleanup_tasks: list[str] = []#
_cleanup_tasks_lock#
_http_flow_handler: nat.front_ends.fastapi.auth_flow_handlers.http_flow_handler.HTTPAuthenticationFlowHandler | None#
property config: nat.data_models.config.Config#
property front_end_config: nat.front_ends.fastapi.fastapi_front_end_config.FastApiFrontEndConfig#
build_app() fastapi.FastAPI#
set_cors_config(nat_app: fastapi.FastAPI) None#

Set the cross origin resource sharing configuration.

async _suppress_authentication_logs(
request: fastapi.Request,
call_next: collections.abc.Callable[[fastapi.Request], collections.abc.Awaitable[fastapi.Response]],
) fastapi.Response#

Intercepts authentication request and supreses logs that contain sensitive data.

abstractmethod configure(
app: fastapi.FastAPI,
builder: nat.builder.workflow_builder.WorkflowBuilder,
)#
Async:

abstractmethod get_step_adaptor() nat.front_ends.fastapi.step_adaptor.StepAdaptor#
class RouteInfo(/, **data: Any)#

Bases: pydantic.BaseModel

function_name: str | None#
class FastApiFrontEndPluginWorker(config: nat.data_models.config.Config)#

Bases: FastApiFrontEndPluginWorkerBase

Helper class that provides a standard way to create an ABC using inheritance.

_outstanding_flows: dict[str, nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler.FlowState]#
_outstanding_flows_lock#
static _periodic_cleanup(
name: str,
job_store: nat.front_ends.fastapi.job_store.JobStore,
sleep_time_sec: int = 300,
)#
Async:

async create_cleanup_task(
app: fastapi.FastAPI,
name: str,
job_store: nat.front_ends.fastapi.job_store.JobStore,
sleep_time_sec: int = 300,
)#
get_step_adaptor() nat.front_ends.fastapi.step_adaptor.StepAdaptor#
async configure(
app: fastapi.FastAPI,
builder: nat.builder.workflow_builder.WorkflowBuilder,
)#
async add_routes(
app: fastapi.FastAPI,
builder: nat.builder.workflow_builder.WorkflowBuilder,
)#
async add_default_route(
app: fastapi.FastAPI,
session_manager: nat.runtime.session.SessionManager,
)#
async add_evaluate_route(
app: fastapi.FastAPI,
session_manager: nat.runtime.session.SessionManager,
)#

Add the evaluate endpoint to the FastAPI app.

async add_static_files_route(
app: fastapi.FastAPI,
builder: nat.builder.workflow_builder.WorkflowBuilder,
)#
async add_route(
app: fastapi.FastAPI,
endpoint: nat.front_ends.fastapi.fastapi_front_end_config.FastApiFrontEndConfig.EndpointBase,
session_manager: nat.runtime.session.SessionManager,
)#
async add_authorization_route(app: fastapi.FastAPI)#
async _add_flow(
state: str,
flow_state: nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler.FlowState,
)#
async _remove_flow(state: str)#