Data Flow#
This document describes the request lifecycle from client submission through agent execution to response delivery, including async job management and real-time SSE streaming.
Request Lifecycle#
Synchronous requests flow through the NeMo Agent Toolkit FastAPI frontend directly to the agent workflow. Asynchronous requests (deep research) use a Dask cluster for distributed execution with SSE-based progress streaming.
Async Job States#
Jobs progress through the following states:
State |
Description |
|---|---|
|
Job created and queued for execution |
|
Dask worker is actively executing the agent |
|
Agent completed successfully; final report available |
|
Agent encountered an unhandled error |
|
User requested cancellation using |
A background reaper task periodically marks stale RUNNING jobs as FAILURE
if they exceed the configured timeout, protecting against ghost jobs from
crashed workers.
SSE Event Types#
Events use a category.state naming convention aligned with NeMo Agent Toolkit’s
IntermediateStep structure. The AgentEventCallback (a LangChain callback
handler) translates LangChain lifecycle events into these SSE events.
Event Type |
Category |
State |
Description |
|---|---|---|---|
|
– |
– |
Announces stream mode: |
|
|
– |
Job status change; includes |
|
|
|
Agent workflow execution begins |
|
|
|
Agent workflow execution completes |
|
|
|
LLM invocation begins; includes model name and prompt/message count |
|
|
|
Streaming token chunk from LLM |
|
|
|
LLM invocation completes; includes usage metadata and optional thinking/reasoning |
|
|
|
Tool execution begins; includes tool name and input |
|
|
|
Tool execution completes; may emit |
|
|
|
Artifact created or updated (files, citations, todos, outputs) |
|
|
|
Retry notification when a chain (LLM call) fails and is retried |
|
|
– |
Error during job execution |
|
|
– |
Periodic heartbeat from Dask worker (every 30s); keeps SSE alive and aids ghost job detection |
|
|
– |
Job was cancelled by user (emitted from Dask worker on |
|
|
– |
Cancellation has been requested for the job |
|
|
– |
Server is shutting down; client should reconnect |
Event Structure#
Every SSE event follows the IntermediateStepEvent schema:
{
"type": "tool.start",
"id": "uuid-v4",
"name": "tavily_web_search",
"timestamp": "2026-02-16T10:30:00Z",
"data": {
"input": {"query": "renewable energy GDP impact"},
"output": null
},
"metadata": {}
}
Artifact Types#
The artifact.update event carries a type field indicating the artifact kind:
Artifact Type |
Description |
|---|---|
|
Virtual filesystem file created by the deep researcher |
|
Intermediate output (draft section, summary) |
|
A source URL or reference discovered during research |
|
An inline citation placed in the report |
|
A research task tracked by |
Reconnection and Replay#
The SSE stream supports seamless reconnection after network interruptions.
Every event stored in the EventStore has a monotonically increasing integer
ID. Clients track the last received event ID and reconnect using the resume
endpoint.
Replay Behavior#
Polling mode: On reconnection, the stream enters polling mode and fetches all events after the provided
last_event_idin large batches (up to 10,000 for completed jobs, 1,000 for running jobs) with no polling delay.Mode transition: Once the polling batch is smaller than the fetch limit (indicating the tail has been reached), the stream emits
stream.mode {mode: live}and switches to live polling.Reconnection flag: The first
job.statusevent after reconnection includesreconnected: trueso clients can distinguish reconnection status updates from new status changes.Completed job replay: If the job already finished before the client reconnects, the stream replays all stored events and immediately sends the terminal
job.statusevent.
API Endpoints#
Method |
Endpoint |
Description |
|---|---|---|
|
|
List available agent types |
|
|
Submit a new async job |
|
|
Get job status |
|
|
SSE stream from beginning |
|
|
SSE stream from event ID (reconnection) |
|
|
Cancel a running job |
|
|
Get artifacts from event store |
|
|
Get final report |
Cancellation#
When a client sends POST /cancel:
The Job Store sets the job status to
INTERRUPTEDA
CancellationMonitorrunning on the Dask worker polls the Job Store at regular intervals (default 1 second)When the monitor detects
INTERRUPTED, it sets anasyncio.Eventthat the agent workflow can check for cooperative cancellationThe SSE stream sends a final
job.status {status: INTERRUPTED}event
Graceful Shutdown#
The SSEConnectionManager tracks all active SSE connections. During server
shutdown:
signal_shutdown()sets a shared event flagActive SSE generators check the flag during polling intervals
Each stream emits a
job.shutdownevent before closingThe connection manager waits for all streams to terminate