nat.runtime.session#

Attributes#

Classes#

PerUserBuilderInfo

Container for per-user builder data with activity tracking.

Session

Represents an active session with access to workflow and builders.

SessionManager

The SessionManager class is used to manage workflow builders and sessions.

Module Contents#

logger#
class PerUserBuilderInfo(/, **data: Any)#

Bases: pydantic.BaseModel

Container for per-user builder data with activity tracking.

Tracks lifecycle and usage of per-user builders for automatic cleanup.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

builder: Any = None#
workflow: Any = None#
semaphore: Any = None#
last_activity: datetime.datetime = None#
ref_count: int = None#
lock: asyncio.Lock = None#
created_at: datetime.datetime = None#
total_requests: int = None#
error_count: int = None#
total_latency_ms: float = None#
record_request(latency_ms: float, success: bool) None#

Record metrics for a completed request.

Args:

latency_ms: Request latency in milliseconds success: Whether the request was successful

class Session(
session_manager: SessionManager,
workflow: nat.builder.workflow.Workflow,
semaphore: asyncio.Semaphore | contextlib.nullcontext,
user_id: str | None = None,
)#

Represents an active session with access to workflow and builders.

Each session is tied to a specific request, and provides access to the appropriate workflow instance (shared or per-user).

Lifecycle: - Created for each request via SessionManager.session() - Automatically manages ref_count for per-user builder tracking - Cleans up context variables on exit

Concurrency: - Each session has its own semaphore for concurrency control - For per-user workflows: each user has an independent concurrency limit - For shared workflows: all sessions share the SessionManager’s semaphore

_session_manager#
_workflow#
_semaphore#
_user_id = None#
property user_id: str | None#
property workflow: nat.builder.workflow.Workflow#
property session_manager: SessionManager#
async run(
message,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
)#

Start a workflow run using this session’s workflow.

Args:

message: Input message for the workflow runtime_type: Runtime type (defaults to SessionManager’s runtime_type)

Yields:

Runner instance for the workflow execution

class SessionManager(
config: nat.data_models.config.Config,
shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
entry_function: str | None = None,
shared_workflow: nat.builder.workflow.Workflow | None = None,
max_concurrency: int = 8,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
)#

The SessionManager class is used to manage workflow builders and sessions. It manages workflow sessions and per-user builders with lifecycle management.

Architecture: - One SessionManager per FastAPI server - Creates/caches PerUserWorkflowBuilder instances per user - Cleans up inactive builders based on timeout

Parameters#

configConfig

The configuration for the workflow

shared_builderWorkflowBuilder

The shared workflow builder

entry_functionstr | None, optional

The entry function for this SessionManager’s workflows, by default None

shared_workflowWorkflow, optional

The shared workflow, by default None

max_concurrencyint, optional

The maximum number of simultaneous workflow invocations, by default 8

runtime_typeRuntimeTypeEnum, optional

The type of runtime the session manager is operating in, by default RuntimeTypeEnum.RUN_OR_SERVE

_config#
_max_concurrency = 8#
_entry_function = None#
_runtime_type#
_context_state#
_context#
_is_workflow_per_user#
_shared_builder#
_shared_workflow = None#
_per_user_builders: dict[str, PerUserBuilderInfo]#
_per_user_builders_lock#
_per_user_builders_cleanup_task: asyncio.Task | None = None#
_per_user_session_timeout#
_per_user_session_cleanup_interval#
_shutdown_event#
property config: nat.data_models.config.Config#
property workflow: nat.builder.workflow.Workflow#

Get workflow for backward compatibility.

Only works for shared workflows. For per-user workflows, use session.workflow.

Raises:

ValueError: If workflow is per-user

property shared_builder: nat.builder.workflow_builder.WorkflowBuilder#
property is_workflow_per_user: bool#
get_workflow_input_schema() type[pydantic.BaseModel]#

Get workflow input schema for OpenAPI documentation.

get_workflow_single_output_schema() type[pydantic.BaseModel]#

Get workflow single output schema for OpenAPI documentation.

get_workflow_streaming_output_schema() type[pydantic.BaseModel]#

Get workflow streaming output schema for OpenAPI documentation.

classmethod create(
config: nat.data_models.config.Config,
shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
entry_function: str | None = None,
max_concurrency: int = 8,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
) SessionManager#
Async:

Create a SessionManager. This is the preferred way to instantiate.

Handles async workflow building and starts cleanup task if per-user.

async _run_periodic_cleanup()#
async _cleanup_inactive_per_user_builders() int#
_get_user_id_from_context() str | None#

Get user ID from current context.

Extraction order: 1. From context user_id (set from nat-session cookie) 2. From context user_manager if set 3. None (for shared workflow or unauthenticated access)

async _get_or_create_per_user_builder(
user_id: str,
) tuple[nat.builder.per_user_workflow_builder.PerUserWorkflowBuilder, nat.builder.workflow.Workflow]#
async session(
user_id: str | None = None,
user_manager=None,
http_connection: starlette.requests.HTTPConnection | None = None,
user_message_id: str | None = None,
conversation_id: str | None = None,
user_input_callback: collections.abc.Callable[[nat.data_models.interactive.InteractionPrompt], collections.abc.Awaitable[nat.data_models.interactive.HumanResponse]] = None,
user_authentication_callback: collections.abc.Callable[[nat.data_models.authentication.AuthProviderBaseConfig, nat.data_models.authentication.AuthFlowType], collections.abc.Awaitable[nat.data_models.authentication.AuthenticatedContext | None]] = None,
)#
async run(
message,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
)#

Start a workflow run

async shutdown() None#

Shutdown the SessionManager and cleanup resources.

Call this when the SessionManager is no longer needed.

set_metadata_from_http_request(
request: starlette.requests.Request,
) None#

Extracts and sets user metadata request attributes from a HTTP request. If request is None, no attributes are set.

set_metadata_from_websocket(
websocket: fastapi.WebSocket,
user_message_id: str | None,
conversation_id: str | None,
) None#

Extracts and sets user metadata for WebSocket connections.

AIQSessionManager#