nat.runtime.session#

Attributes#

Classes#

PerUserBuilderInfo

Container for per-user builder data with activity tracking.

Session

Represents an active session with access to workflow and builders.

SessionManager

The SessionManager class is used to manage workflow builders and sessions.

Module Contents#

logger#
class PerUserBuilderInfo(/, **data: Any)#

Bases: pydantic.BaseModel

Container for per-user builder data with activity tracking.

Tracks lifecycle and usage of per-user builders for automatic cleanup.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_config#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

builder: Any = None#
workflow: Any = None#
semaphore: Any = None#
last_activity: datetime.datetime = None#
ref_count: int = None#
lock: asyncio.Lock = None#
created_at: datetime.datetime = None#
total_requests: int = None#
error_count: int = None#
total_latency_ms: float = None#
record_request(latency_ms: float, success: bool) None#

Record metrics for a completed request.

Args:

latency_ms: Request latency in milliseconds success: Whether the request was successful

class Session(
session_manager: SessionManager,
workflow: nat.builder.workflow.Workflow,
semaphore: asyncio.Semaphore | contextlib.nullcontext,
user_id: str | None = None,
)#

Represents an active session with access to workflow and builders.

Each session is tied to a specific request, and provides access to the appropriate workflow instance (shared or per-user).

Lifecycle: - Created for each request via SessionManager.session() - Automatically manages ref_count for per-user builder tracking - Cleans up context variables on exit

Concurrency: - Each session has its own semaphore for concurrency control - For per-user workflows: each user has an independent concurrency limit - For shared workflows: all sessions share the SessionManager’s semaphore

_session_manager#
_workflow#
_semaphore#
_user_id = None#
property user_id: str | None#
property workflow: nat.builder.workflow.Workflow#
property session_manager: SessionManager#
async run(
message,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
parent_id: str | None = None,
parent_name: str | None = None,
)#

Start a workflow run using this session’s workflow.

Args:
message

Input message for the workflow

runtime_typeRuntimeTypeEnum

Runtime type (defaults to SessionManager’s runtime_type)

parent_idstr | None, optional

Optional parent step ID for cross-workflow observability. When set, the root workflow step is emitted with this as its parent.

parent_namestr | None, optional

Optional parent step name for cross-workflow observability. When set, the root workflow’s function ancestry uses this as the parent name.

Yields:

Runner instance for the workflow execution

class SessionManager(
config: nat.data_models.config.Config,
shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
entry_function: str | None = None,
shared_workflow: nat.builder.workflow.Workflow | None = None,
max_concurrency: int = 8,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
)#

The SessionManager class is used to manage workflow builders and sessions. It manages workflow sessions and per-user builders with lifecycle management.

Architecture: - One SessionManager per FastAPI server - Creates/caches PerUserWorkflowBuilder instances per user - Cleans up inactive builders based on timeout

Parameters#

configConfig

The configuration for the workflow

shared_builderWorkflowBuilder

The shared workflow builder

entry_functionstr | None, optional

The entry function for this SessionManager’s workflows, by default None

shared_workflowWorkflow, optional

The shared workflow, by default None

max_concurrencyint, optional

The maximum number of simultaneous workflow invocations, by default 8

runtime_typeRuntimeTypeEnum, optional

The type of runtime the session manager is operating in, by default RuntimeTypeEnum.RUN_OR_SERVE

_config#
_max_concurrency = 8#
_entry_function = None#
_runtime_type#
_context_state#
_context#
_is_workflow_per_user#
_shared_builder#
_shared_workflow = None#
_per_user_builders: dict[str, PerUserBuilderInfo]#
_per_user_builders_lock#
_per_user_builders_cleanup_task: asyncio.Task | None = None#
_per_user_session_timeout#
_per_user_session_cleanup_interval#
_shutdown_event#
property config: nat.data_models.config.Config#
property workflow: nat.builder.workflow.Workflow#

Get workflow for backward compatibility.

Only works for shared workflows. For per-user workflows, use session.workflow.

Raises:

ValueError: If workflow is per-user

property shared_builder: nat.builder.workflow_builder.WorkflowBuilder#
property is_workflow_per_user: bool#
get_workflow_input_schema() type[pydantic.BaseModel]#

Get workflow input schema for OpenAPI documentation.

get_workflow_single_output_schema() type[pydantic.BaseModel]#

Get workflow single output schema for OpenAPI documentation.

get_workflow_streaming_output_schema() type[pydantic.BaseModel]#

Get workflow streaming output schema for OpenAPI documentation.

classmethod create(
config: nat.data_models.config.Config,
shared_builder: nat.builder.workflow_builder.WorkflowBuilder,
entry_function: str | None = None,
max_concurrency: int = 8,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
) SessionManager#
Async:

Create a SessionManager. This is the preferred way to instantiate.

Handles async workflow building and starts cleanup task if per-user.

async _run_periodic_cleanup()#
async _cleanup_inactive_per_user_builders() int#
_get_user_id_from_context() str | None#

Get user ID from current context.

Extraction order: 1. From context user_id (set from JWT or nat-session cookie) 2. From context user_manager if set 3. None (for shared workflow or unauthenticated access)

async _get_or_create_per_user_builder(
user_id: str,
) tuple[nat.builder.per_user_workflow_builder.PerUserWorkflowBuilder, nat.builder.workflow.Workflow]#
async session(
user_id: str | None = None,
user_manager=None,
http_connection: starlette.requests.HTTPConnection | None = None,
user_message_id: str | None = None,
conversation_id: str | None = None,
user_input_callback: collections.abc.Callable[[nat.data_models.interactive.InteractionPrompt], collections.abc.Awaitable[nat.data_models.interactive.HumanResponse]] = None,
user_authentication_callback: collections.abc.Callable[[nat.data_models.authentication.AuthProviderBaseConfig, nat.data_models.authentication.AuthFlowType], collections.abc.Awaitable[nat.data_models.authentication.AuthenticatedContext | None]] = None,
)#
async run(
message,
runtime_type: nat.data_models.runtime_enum.RuntimeTypeEnum = RuntimeTypeEnum.RUN_OR_SERVE,
)#

Start a workflow run

async shutdown() None#

Shutdown the SessionManager and cleanup resources.

Call this when the SessionManager is no longer needed.

async set_metadata_from_http_request(
request: starlette.requests.Request,
) tuple[contextvars.Token, contextvars.Token]#

Extracts and sets user metadata from an HTTP request.

Sets request attributes (method, path, headers), plus optional headers: - conversation-id - user-message-id - traceparent - workflow-trace-id - workflow-run-id - workflow-parent-id (for cross-workflow observability) - workflow-parent-name (for cross-workflow observability)

Returns:

A tuple of (workflow_parent_id_token, workflow_parent_name_token). Each element is a contextvars.Token if the corresponding header was present, or None otherwise. Callers must reset these tokens when the request scope ends to avoid context-variable leaks.

set_metadata_from_websocket(
websocket: fastapi.WebSocket,
user_message_id: str | None,
conversation_id: str | None,
pre_parsed_cookies: dict[str, str] | None = None,
) None#

Extracts and sets user metadata for WebSocket connections. If pre_parsed_cookies is provided (e.g. from get_auth_and_cookies_from_connection), uses it instead of parsing scope headers again.