nat.runtime.session#
Attributes#
Classes#
Container for per-user builder data with activity tracking. |
|
Represents an active session with access to workflow and builders. |
|
The SessionManager class is used to manage workflow builders and sessions. |
Module Contents#
- logger#
- class PerUserBuilderInfo(/, **data: Any)#
Bases:
pydantic.BaseModelContainer for per-user builder data with activity tracking.
Tracks lifecycle and usage of per-user builders for automatic cleanup.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.- model_config#
Configuration for the model, should be a dictionary conforming to [
ConfigDict][pydantic.config.ConfigDict].
- builder: Any = None#
- workflow: Any = None#
- semaphore: Any = None#
- last_activity: datetime.datetime = None#
- lock: asyncio.Lock = None#
- created_at: datetime.datetime = None#
- class Session(
- session_manager: SessionManager,
- workflow: nat.builder.workflow.Workflow,
- semaphore: asyncio.Semaphore | contextlib.nullcontext,
- user_id: str | None = None,
Represents an active session with access to workflow and builders.
Each session is tied to a specific request, and provides access to the appropriate workflow instance (shared or per-user).
Lifecycle: - Created for each request via SessionManager.session() - Automatically manages ref_count for per-user builder tracking - Cleans up context variables on exit
Concurrency: - Each session has its own semaphore for concurrency control - For per-user workflows: each user has an independent concurrency limit - For shared workflows: all sessions share the SessionManager’s semaphore
- _session_manager#
- _workflow#
- _semaphore#
- _user_id = None#
- property workflow: nat.builder.workflow.Workflow#
- property session_manager: SessionManager#
- async run(
- message,
- runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
Start a workflow run using this session’s workflow.
- Args:
message: Input message for the workflow runtime_type: Runtime type (defaults to SessionManager’s runtime_type)
- Yields:
Runner instance for the workflow execution
- class SessionManager(
- config: nat.data_models.config.Config,
- shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
- entry_function: str | None = None,
- shared_workflow: nat.builder.workflow.Workflow | None = None,
- max_concurrency: int = 8,
- runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
The SessionManager class is used to manage workflow builders and sessions. It manages workflow sessions and per-user builders with lifecycle management.
Architecture: - One SessionManager per FastAPI server - Creates/caches PerUserWorkflowBuilder instances per user - Cleans up inactive builders based on timeout
Parameters#
- configConfig
The configuration for the workflow
- shared_builderWorkflowBuilder
The shared workflow builder
- entry_functionstr | None, optional
The entry function for this SessionManager’s workflows, by default None
- shared_workflowWorkflow, optional
The shared workflow, by default None
- max_concurrencyint, optional
The maximum number of simultaneous workflow invocations, by default 8
- runtime_typeRuntimeTypeEnum, optional
The type of runtime the session manager is operating in, by default RuntimeTypeEnum.RUN_OR_SERVE
- _config#
- _max_concurrency = 8#
- _entry_function = None#
- _runtime_type#
- _context_state#
- _context#
- _is_workflow_per_user#
- _per_user_builders: dict[str, PerUserBuilderInfo]#
- _per_user_builders_lock#
- _per_user_builders_cleanup_task: asyncio.Task | None = None#
- _per_user_session_timeout#
- _per_user_session_cleanup_interval#
- _shutdown_event#
- property config: nat.data_models.config.Config#
- property workflow: nat.builder.workflow.Workflow#
Get workflow for backward compatibility.
Only works for shared workflows. For per-user workflows, use session.workflow.
- Raises:
ValueError: If workflow is per-user
- get_workflow_input_schema() type[pydantic.BaseModel]#
Get workflow input schema for OpenAPI documentation.
- get_workflow_single_output_schema() type[pydantic.BaseModel]#
Get workflow single output schema for OpenAPI documentation.
- get_workflow_streaming_output_schema() type[pydantic.BaseModel]#
Get workflow streaming output schema for OpenAPI documentation.
- classmethod create(
- config: nat.data_models.config.Config,
- shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
- entry_function: str | None = None,
- max_concurrency: int = 8,
- runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
- Async:
Create a SessionManager. This is the preferred way to instantiate.
Handles async workflow building and starts cleanup task if per-user.
- async _run_periodic_cleanup()#
- _get_user_id_from_context() str | None#
Get user ID from current context.
Extraction order: 1. From context user_id (set from nat-session cookie) 2. From context user_manager if set 3. None (for shared workflow or unauthenticated access)
- async _get_or_create_per_user_builder(
- user_id: str,
- async session(
- user_id: str | None = None,
- user_manager=None,
- http_connection: starlette.requests.HTTPConnection | None = None,
- user_message_id: str | None = None,
- conversation_id: str | None = None,
- user_input_callback: collections.abc.Callable[[nat.data_models.interactive.InteractionPrompt], collections.abc.Awaitable[nat.data_models.interactive.HumanResponse]] = None,
- user_authentication_callback: collections.abc.Callable[[nat.data_models.authentication.AuthProviderBaseConfig, nat.data_models.authentication.AuthFlowType], collections.abc.Awaitable[nat.data_models.authentication.AuthenticatedContext | None]] = None,
- async run(
- message,
- runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
Start a workflow run
- async shutdown() None#
Shutdown the SessionManager and cleanup resources.
Call this when the SessionManager is no longer needed.
- AIQSessionManager#