nat.front_ends.fastapi.fastapi_front_end_plugin_worker#
Classes#
Helper class that provides a standard way to create an ABC using |
|
Helper class that provides a standard way to create an ABC using |
Module Contents#
- class FastApiFrontEndPluginWorkerBase(config: nat.data_models.config.Config)#
Bases:
abc.ABCHelper class that provides a standard way to create an ABC using inheritance.
- _config#
- _front_end_config#
- _dask_available = False#
- _job_store = None#
- _http_flow_handler: nat.front_ends.fastapi.auth_flow_handlers.http_flow_handler.HTTPAuthenticationFlowHandler | None#
- _scheduler_address#
- _db_url#
- _config_file_path#
- _use_dask_threads#
- _log_level#
- property config: nat.data_models.config.Config#
- property front_end_config: nat.front_ends.fastapi.fastapi_front_end_config.FastApiFrontEndConfig#
- build_app() fastapi.FastAPI#
- set_cors_config(nat_app: fastapi.FastAPI) None#
Set the cross origin resource sharing configuration.
- async _suppress_authentication_logs(
- request: fastapi.Request,
- call_next: collections.abc.Callable[[fastapi.Request], collections.abc.Awaitable[fastapi.Response]],
Intercepts authentication request and supreses logs that contain sensitive data.
- abstractmethod configure(
- app: fastapi.FastAPI,
- builder: nat.builder.workflow_builder.WorkflowBuilder,
- Async:
- abstractmethod get_step_adaptor() nat.front_ends.fastapi.step_adaptor.StepAdaptor#
- class FastApiFrontEndPluginWorker(config: nat.data_models.config.Config)#
Bases:
FastApiFrontEndPluginWorkerBaseHelper class that provides a standard way to create an ABC using inheritance.
- _outstanding_flows: dict[str, nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler.FlowState]#
- _outstanding_flows_lock#
- _conversation_handlers: dict[str, nat.front_ends.fastapi.message_handler.WebSocketMessageHandler]#
- _session_managers: list[nat.runtime.session.SessionManager] = []#
- _evaluators: dict[str, nat.builder.evaluator.EvaluatorInfo]#
- _eval_builder: nat.builder.workflow_builder.WorkflowEvalBuilderBase | None = None#
- _execution_store#
- _http_flow_handler#
- get_conversation_handler(
- conversation_id: str,
Get a conversation handler for reconnection support.
- set_conversation_handler(
- conversation_id: str,
- handler: nat.front_ends.fastapi.message_handler.WebSocketMessageHandler,
Register a conversation handler for reconnection support.
- remove_conversation_handler(conversation_id: str) None#
Remove a conversation handler when workflow completes.
- async initialize_evaluators(config: nat.data_models.config.Config)#
Initialize and store evaluators from config for single-item evaluation.
- async _create_session_manager(
- builder: nat.builder.workflow_builder.WorkflowBuilder,
- entry_function: str | None = None,
Create and register a SessionManager.
- async cleanup_session_managers()#
Clean up all SessionManager resources on shutdown.
- async cleanup_evaluators()#
Clean up evaluator resources on shutdown.
- get_step_adaptor() nat.front_ends.fastapi.step_adaptor.StepAdaptor#
- async configure(
- app: fastapi.FastAPI,
- builder: nat.builder.workflow_builder.WorkflowBuilder,
- async add_routes(
- app: fastapi.FastAPI,
- builder: nat.builder.workflow_builder.WorkflowBuilder,
- async add_default_route(
- app: fastapi.FastAPI,
- session_manager: nat.runtime.session.SessionManager,
- async _add_flow(
- state: str,
- flow_state: nat.front_ends.fastapi.auth_flow_handlers.websocket_flow_handler.FlowState,