nat.front_ends.fastapi.http_interactive_runner#

HTTP interactive execution runner.

Runs a workflow in a background task with HITL and OAuth callbacks that coordinate with the ExecutionStore so HTTP clients can interact via polling and dedicated endpoints.

Attributes#

Classes#

HTTPInteractiveRunner

Coordinates running a workflow with HTTP-based HITL and OAuth.

Module Contents#

logger#
_HITL_TIMEOUT_GRACE_PERIOD_SECONDS: int = 5#
class HTTPInteractiveRunner(
execution_store: nat.front_ends.fastapi.execution_store.ExecutionStore,
session_manager: nat.runtime.session.SessionManager,
http_flow_handler: nat.front_ends.fastapi.auth_flow_handlers.http_flow_handler.HTTPAuthenticationFlowHandler,
)#

Coordinates running a workflow with HTTP-based HITL and OAuth.

For non-streaming (single-response) endpoints:
  1. Call start_non_streaming().

  2. Await record.first_outcome – if the workflow finishes first, return 200 with the result; if it needs interaction / OAuth, return 202.

  3. Client polls GET /executions/{id} and submits responses.

For streaming endpoints:
  1. Call streaming_generator() which yields SSE chunks.

  2. When the workflow needs HITL / OAuth, a special event is yielded, and the generator blocks until the client responds, then continues streaming.

_store#
_session_manager#
_http_flow_handler#
_build_hitl_callback(
record: nat.front_ends.fastapi.execution_store.ExecutionRecord,
*,
stream_queue: asyncio.Queue[nat.data_models.api_server.ResponseSerializable | None] | None = None,
)#

Return an async def callback(prompt: InteractionPrompt) -> HumanResponse suitable for session(..., user_input_callback=callback).

When stream_queue is provided (streaming mode), the callback also pushes a StreamInteractionEvent onto the queue so the SSE generator can emit it.

_build_auth_callback(
record: nat.front_ends.fastapi.execution_store.ExecutionRecord,
*,
stream_queue: asyncio.Queue[nat.data_models.api_server.ResponseSerializable | None] | None = None,
)#

Return a wrapper around the HTTP flow handler’s authenticate that publishes oauth_required to the execution store (and optionally to the stream queue) before blocking on the flow state future.

async start_non_streaming(
payload: Any,
request: fastapi.Request,
result_type: type | None = None,
) nat.front_ends.fastapi.execution_store.ExecutionRecord#

Create an execution record, start the workflow as a background task, and return the record immediately.

The caller should await record.first_outcome.wait() to know when to return 200 (workflow done) or 202 (interaction / OAuth needed).

async _streaming_generator_impl(
request: fastapi.Request,
*,
workflow_gen_factory: collections.abc.Callable[[Any], collections.abc.AsyncGenerator[Any]],
error_log_message: str,
passthrough_str_items: bool = False,
) collections.abc.AsyncGenerator[str]#

Shared streaming orchestration for interactive HTTP endpoints.

async streaming_generator(
payload: Any,
request: fastapi.Request,
*,
streaming: bool,
step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor,
result_type: type | None = None,
output_type: type | None = None,
) collections.abc.AsyncGenerator[str]#

Async generator that yields SSE data: / event: lines.

When the workflow pauses for interaction or OAuth, this generator emits a special event and then blocks until the client responds (the HTTP connection stays open).

async streaming_generator_raw(
payload: Any,
request: fastapi.Request,
*,
streaming: bool,
result_type: type | None = None,
output_type: type | None = None,
filter_steps: str | None = None,
) collections.abc.AsyncGenerator[str]#

Async generator that yields raw SSE chunks for /full style streaming.

This uses generate_streaming_response_full_as_str so intermediate steps are emitted without step-adaptor translations.