nat.front_ends.fastapi.execution_store#

In-memory execution store for HTTP HITL and OAuth interactive workflows.

Each execution tracks a single background workflow run that may be paused while waiting for a human interaction response or an OAuth consent.

Attributes#

Classes#

PendingInteraction

State for a single outstanding human interaction within an execution.

PendingOAuth

State for an outstanding OAuth flow within an execution.

ExecutionRecord

Full state for a single execution.

ExecutionStore

Thread-safe (asyncio-safe) in-memory store for HTTP interactive executions.

Module Contents#

logger#
DEFAULT_EXECUTION_TTL: int = 600#
class PendingInteraction#

State for a single outstanding human interaction within an execution.

interaction_id: str#
prompt: nat.data_models.interactive.HumanPrompt#
future: asyncio.Future[nat.data_models.interactive.HumanResponse]#
created_at: float#
class PendingOAuth#

State for an outstanding OAuth flow within an execution.

auth_url: str#
oauth_state: str#
created_at: float#
class ExecutionRecord#

Full state for a single execution.

execution_id: str#
status: nat.data_models.interactive_http.ExecutionStatus#
task: asyncio.Task | None = None#
result: Any = None#
error: str | None = None#
pending_interaction: PendingInteraction | None = None#
pending_oauth: PendingOAuth | None = None#
first_outcome: asyncio.Event#
created_at: float#
completed_at: float | None = None#
class ExecutionStore(ttl_seconds: int = DEFAULT_EXECUTION_TTL)#

Thread-safe (asyncio-safe) in-memory store for HTTP interactive executions.

_executions: dict[str, ExecutionRecord]#
_lock#
_ttl_seconds = 600#
async create_execution() ExecutionRecord#

Create a new execution and return its record.

async get(execution_id: str) ExecutionRecord | None#
async set_interaction_required(
execution_id: str,
prompt: nat.data_models.interactive.HumanPrompt,
interaction_id: str | None = None,
) PendingInteraction#

Mark the execution as waiting for human interaction.

Returns the PendingInteraction whose .future should be awaited by the background task.

async set_oauth_required(
execution_id: str,
auth_url: str,
oauth_state: str,
) None#

Mark the execution as waiting for OAuth consent.

async set_running(execution_id: str) None#

Transition back to running (after interaction / OAuth completes).

async set_completed(execution_id: str, result: Any) None#

Mark the execution as successfully completed.

async set_failed(execution_id: str, error: str) None#

Mark the execution as failed.

async resolve_interaction(
execution_id: str,
interaction_id: str,
response: nat.data_models.interactive.HumanResponse,
) None#

Resolve a pending interaction by setting the future result.

Raises KeyError if the execution or interaction does not exist. Raises ValueError if the interaction has already been resolved.

async cleanup_expired() int#

Remove completed/failed executions older than TTL. Returns count removed.

async remove(execution_id: str) None#

Explicitly remove an execution.