REST API#
The AI-Q blueprint exposes a REST API built on top of NeMo Agent Toolkit’s built-in FastAPI infrastructure. The AI-Q API is an extension layer that adds agent-agnostic async job management with SSE streaming, knowledge management endpoints, and event replay capabilities.
The API is served when running in web mode (nat serve). CLI mode (nat run) uses WebSocket communication instead and does not expose these endpoints.
Architecture#
NeMo Agent Toolkit provides the core infrastructure: job tracking, Dask scheduling, and SQLite/PostgreSQL persistence. The AI-Q API plugin (aiq_api) extends this with:
Async Jobs API – submit research queries to any registered agent, track progress through SSE
Knowledge API – manage document collections and trigger ingestion (when a knowledge function is configured)
Event replay – reconnect to an in-progress job and replay historical events from any point
Async Jobs API#
Base path: /v1/jobs/async
Endpoints#
Method |
Path |
Description |
|---|---|---|
|
|
List registered agent types |
|
|
Submit a new research job |
|
|
Get job status |
|
|
SSE event stream from beginning |
|
|
SSE stream from event ID (reconnection) |
|
|
Cancel a running job |
|
|
Get accumulated job artifacts |
|
|
Get final research report |
|
|
List available data sources |
|
|
Health check (includes Dask status) |
List Available Agents#
Returns all registered agent types that can be used with the submit endpoint.
curl http://localhost:8000/v1/jobs/async/agents
Response:
{
"agents": [
{"agent_type": "deep_researcher", "description": "Performs comprehensive multi-loop deep research"},
{"agent_type": "shallow_researcher", "description": "Performs quick single-turn research"}
]
}
Submit a Job#
Submit a research query to a registered agent. Returns a job ID for tracking progress through SSE.
curl -X POST http://localhost:8000/v1/jobs/async/submit \
-H "Content-Type: application/json" \
-d '{
"agent_type": "deep_researcher",
"input": "Research quantum computing trends in 2026"
}'
Request body (JobSubmitRequest):
Field |
Type |
Required |
Description |
|---|---|---|---|
|
|
Yes |
Agent identifier (for example, |
|
|
Yes |
Research query (min 1 character) |
|
|
No |
Custom job ID. Auto-generated UUID if omitted. Pattern: |
|
|
No |
Job expiry in seconds. Range: 600–604800 (10 min to 7 days). Default from config |
Response (JobStatusResponse):
{
"job_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"status": "SUBMITTED",
"agent_type": "deep_researcher"
}
Error responses:
Status |
Reason |
|---|---|
|
Unknown agent type or invalid request |
|
Dask scheduler not available |
Get Job Status#
curl http://localhost:8000/v1/jobs/async/job/{job_id}
Response:
{
"job_id": "abc123",
"status": "RUNNING",
"agent_type": "deep_researcher",
"error": null,
"created_at": "2026-02-12T10:30:00Z"
}
Job statuses: SUBMITTED, RUNNING, SUCCESS, FAILURE, INTERRUPTED.
Stream Events (SSE)#
Stream real-time events from a running or completed job using Server-Sent Events.
# Stream from beginning
curl -N http://localhost:8000/v1/jobs/async/job/{job_id}/stream
# Reconnect from a specific event ID
curl -N http://localhost:8000/v1/jobs/async/job/{job_id}/stream/{last_event_id}
Each SSE message has the format:
id: 42
event: llm.chunk
data: {"content": "The latest advances..."}
Replay and Live Handoff#
When a client connects (or reconnects) to a job stream, the server replays all historical events as fast as possible, then sends a stream.mode event to signal the transition to live streaming. The exact payload depends on the database backend:
SQLite (polling): First sends
{"mode":"polling","interval_ms":500}, then{"mode":"live"}after replay completes.PostgreSQL (pub-sub): Sends
{"mode":"pubsub","channel":"job_events_<job_id>"}after replay completes.
After the transition event, new events are delivered in real time. For PostgreSQL backends, the server uses LISTEN/NOTIFY for sub-10ms latency. For SQLite, it polls at 500ms intervals.
Cancel a Job#
curl -X POST http://localhost:8000/v1/jobs/async/job/{job_id}/cancel
Response:
{
"job_id": "abc123",
"status": "INTERRUPTED",
"task_cancelled": true
}
Status |
Reason |
|---|---|
|
Job is not in |
|
Job not found |
Get Job Artifacts#
Returns accumulated tool calls, outputs, and source citations from a job.
curl http://localhost:8000/v1/jobs/async/job/{job_id}/state
Response (JobStateResponse):
{
"job_id": "abc123",
"has_state": true,
"state": null,
"artifacts": {
"tools": [
{
"id": "tool_123",
"name": "tavily_web_search",
"input": {"query": "quantum computing 2026"},
"output": "...",
"status": "completed",
"workflow": "shallow_research_agent"
}
],
"outputs": [
{
"type": "citation_source",
"content": "https://example.com/article",
"workflow": "shallow_research_agent"
}
],
"sources": {
"found": 12,
"cited": 8,
"found_urls": ["https://..."],
"cited_urls": ["https://..."]
}
}
}
Get Final Report#
curl http://localhost:8000/v1/jobs/async/job/{job_id}/report
Response (JobReportResponse):
{
"job_id": "abc123",
"has_report": true,
"report": "# Quantum Computing Trends in 2026\n\n..."
}
SSE Event Types#
Events streamed during job execution. Refer to the Data Flow page for details on how these events are generated and consumed.
Event |
Description |
|---|---|
|
Stream state transition. In polling mode (SQLite), the server first sends |
|
Job status changes ( |
|
Error occurred during execution |
|
Server is shutting down gracefully |
|
Periodic heartbeat from Dask worker (every 30s); keeps SSE connection alive |
|
Job was cancelled by user |
|
Retry notification when a chain (LLM call) fails and is retried |
|
Cancellation was requested by user |
|
Workflow lifecycle boundaries |
|
LLM inference progress. |
|
Tool invocation lifecycle. Includes tool name, input, and output |
|
Structured updates: todos, files, citations ( |
Agent Registration#
Agents are registered by type so the async job runner can load them dynamically. Registration happens at import time (typically in a NeMo Agent Toolkit plugin module):
from aiq_api.registry import register_agent
register_agent(
agent_type="my_agent",
class_path="my_package.agent.MyAgent",
config_name="my_agent_config",
description="My custom research agent",
)
Parameters:
Parameter |
Description |
|---|---|
|
Short identifier used in submit requests (for example, |
|
Full module path to the agent class |
|
Must match a function name in the NeMo Agent Toolkit YAML config (for example, |
|
Human-readable description shown in the agent list |
The default agents (deep_researcher and shallow_researcher) are registered automatically when the aiq_api plugin loads.
Knowledge API#
The Knowledge API endpoints are conditionally registered – they appear only when a knowledge_retrieval function is configured in the workflow. The backend (LlamaIndex, Foundational RAG, etc.) is determined by the knowledge config.
Collection Endpoints#
Method |
Path |
Description |
|---|---|---|
|
|
Create a new collection |
|
|
List all collections |
|
|
Get collection details |
|
|
Delete a collection and all its contents |
|
|
Check knowledge backend health |
Create a Collection#
curl -X POST http://localhost:8000/v1/collections \
-H "Content-Type: application/json" \
-d '{
"name": "research-papers",
"description": "Collection of ML research papers",
"metadata": {}
}'
List Collections#
curl http://localhost:8000/v1/collections
Document Endpoints#
Method |
Path |
Description |
|---|---|---|
|
|
Upload and ingest documents (returns job ID) |
|
|
List documents in a collection |
|
|
Delete documents by file ID |
|
|
Get ingestion job status |
Upload Documents#
Document upload is asynchronous. The endpoint returns a job ID that you poll for ingestion status.
curl -X POST http://localhost:8000/v1/collections/research-papers/documents \
-F "files=@paper1.pdf" \
-F "files=@paper2.pdf"
Response (202 Accepted):
{
"job_id": "job_abc123",
"file_ids": ["file_abc123", "file_def456"],
"message": "Ingestion job submitted for 2 file(s)"
}
Delete Documents#
curl -X DELETE http://localhost:8000/v1/collections/research-papers/documents \
-H "Content-Type: application/json" \
-d '{"file_ids": ["file_abc123", "file_def456"]}'
Request body (DeleteFilesRequest):
Field |
Type |
Required |
Description |
|---|---|---|---|
|
|
Yes |
List of file IDs to delete |
Check Ingestion Status#
curl http://localhost:8000/v1/documents/{job_id}/status
List Data Sources#
Returns available data sources based on the configured tools.
curl http://localhost:8000/v1/data_sources
Response:
[
{
"id": "web_search",
"name": "Web Search",
"description": "Search the web for real-time information."
},
{
"id": "knowledge_layer",
"name": "Knowledge Base",
"description": "Search uploaded documents and files."
}
]
The knowledge_layer entry only appears when a knowledge retrieval function is configured.
Health Check#
curl http://localhost:8000/health
Response:
{
"status": "ok",
"dask_available": true
}
Configuration#
The API is configured through the NeMo Agent Toolkit config file under general.front_end:
general:
front_end:
_type: aiq_api
runner_class: aiq_api.plugin.AIQAPIWorker
db_url: ${NAT_JOB_STORE_DB_URL:-sqlite+aiosqlite:///./jobs.db}
expiry_seconds: 86400 # 24 hours
cors:
allow_origin_regex: 'http://localhost(:\d+)?'
allow_methods: [GET, POST, DELETE, OPTIONS]
allow_headers: ["*"]
allow_credentials: true
Mode Comparison#
Mode |
Command |
Async Jobs |
Database |
API Available |
|---|---|---|---|---|
CLI |
|
No |
None |
No |
Web (local) |
|
Yes |
SQLite ( |
Yes |
Production |
|
Yes |
PostgreSQL |
Yes |
Database Configuration#
Variable |
Purpose |
Default |
|---|---|---|
|
Job store + event store database |
|
|
Dask scheduler for distributed execution |
Auto-created local cluster |
For production deployments, use PostgreSQL for both the job store and LISTEN/NOTIFY-based real-time SSE:
export NAT_JOB_STORE_DB_URL="postgresql+asyncpg://user:pass@host:5432/aiq_jobs" # pragma: allowlist secret
export NAT_DASK_SCHEDULER_ADDRESS="tcp://scheduler:8786"
Debug Console#
When the aiq_debug package is installed, a debug console is available at http://localhost:8000/debug with:
Real-time SSE streaming visualization
Job submission and tracking
State visualization (todos, subagents, sources, tool calls)
Copy SSE streams for debugging