nat.front_ends.fastapi.http_interactive_runner#
HTTP interactive execution runner.
Runs a workflow in a background task with HITL and OAuth callbacks that
coordinate with the ExecutionStore so HTTP clients can interact
via polling and dedicated endpoints.
Attributes#
Classes#
Coordinates running a workflow with HTTP-based HITL and OAuth. |
Module Contents#
- logger#
- class HTTPInteractiveRunner(
- execution_store: nat.front_ends.fastapi.execution_store.ExecutionStore,
- session_manager: nat.runtime.session.SessionManager,
- http_flow_handler: nat.front_ends.fastapi.auth_flow_handlers.http_flow_handler.HTTPAuthenticationFlowHandler,
Coordinates running a workflow with HTTP-based HITL and OAuth.
- For non-streaming (single-response) endpoints:
Call
start_non_streaming().Await
record.first_outcome– if the workflow finishes first, return 200 with the result; if it needs interaction / OAuth, return 202.Client polls
GET /executions/{id}and submits responses.
- For streaming endpoints:
Call
streaming_generator()which yields SSE chunks.When the workflow needs HITL / OAuth, a special event is yielded, and the generator blocks until the client responds, then continues streaming.
- _store#
- _session_manager#
- _http_flow_handler#
- _build_hitl_callback(
- record: nat.front_ends.fastapi.execution_store.ExecutionRecord,
- *,
- stream_queue: asyncio.Queue[nat.data_models.api_server.ResponseSerializable | None] | None = None,
Return an
async def callback(prompt: InteractionPrompt) -> HumanResponsesuitable forsession(..., user_input_callback=callback).When stream_queue is provided (streaming mode), the callback also pushes a
StreamInteractionEventonto the queue so the SSE generator can emit it.
- _build_auth_callback(
- record: nat.front_ends.fastapi.execution_store.ExecutionRecord,
- *,
- stream_queue: asyncio.Queue[nat.data_models.api_server.ResponseSerializable | None] | None = None,
Return a wrapper around the HTTP flow handler’s
authenticatethat publishesoauth_requiredto the execution store (and optionally to the stream queue) before blocking on the flow state future.
- async start_non_streaming( ) nat.front_ends.fastapi.execution_store.ExecutionRecord#
Create an execution record, start the workflow as a background task, and return the record immediately.
The caller should
await record.first_outcome.wait()to know when to return 200 (workflow done) or 202 (interaction / OAuth needed).
- async _streaming_generator_impl(
- request: fastapi.Request,
- *,
- workflow_gen_factory: collections.abc.Callable[[Any], collections.abc.AsyncGenerator[Any]],
- error_log_message: str,
- passthrough_str_items: bool = False,
Shared streaming orchestration for interactive HTTP endpoints.
- async streaming_generator(
- payload: Any,
- request: fastapi.Request,
- *,
- streaming: bool,
- step_adaptor: nat.front_ends.fastapi.step_adaptor.StepAdaptor,
- result_type: type | None = None,
- output_type: type | None = None,
Async generator that yields SSE
data:/event:lines.When the workflow pauses for interaction or OAuth, this generator emits a special event and then blocks until the client responds (the HTTP connection stays open).
- async streaming_generator_raw(
- payload: Any,
- request: fastapi.Request,
- *,
- streaming: bool,
- result_type: type | None = None,
- output_type: type | None = None,
- filter_steps: str | None = None,
Async generator that yields raw SSE chunks for
/fullstyle streaming.This uses
generate_streaming_response_full_as_strso intermediate steps are emitted without step-adaptor translations.