Real-Time VLM Microservice#

Overview#

The Real-Time VLM Server is a FastAPI-based REST API service that provides real-time video understanding capabilities using Vision Language Models (VLM). It enables users to generate captions, process live video streams, and manage live streams through a comprehensive set of REST endpoints.

The server translates between HTTP requests/responses and the underlying Real-Time VLM Microservice components, providing a clean API interface for live stream processing operations.

Key Features#

  • VLM Caption Generation: Generate captions and alerts using Vision Language Models for live streams

  • Live Stream Support: Process RTSP live streams for real-time caption generation and alert detection

  • Streaming Responses: Server-Sent Events (SSE) for streaming output or Kafka messages

  • Asset Management: Comprehensive stream lifecycle management

  • Sampling Rate Management: Manage the image frames sampling for VLM inference of the live stream

  • Health Monitoring: Health check endpoints for service monitoring

  • Metrics: Prometheus metrics endpoint for observability

Architecture#

The following diagram illustrates the DeepStream Accelerated Pipeline architecture for the RTVI VLM Server:

DeepStream Accelerated Pipeline Architecture

The pipeline processes camera streams through the following stages:

  • RTSP + Decode: Receives and decodes camera streams

  • Video Segment Creation: Creates video segments for processing

  • Infer: Performs inference using VLM models (with vLLM inference engine)

  • MsgBroker (NvSchema): Publishes messages to Kafka

The system can be configured and controlled via:

  • REST API: HTTP-based API for configuration and control

Models Supported#

The RTVI VLM Microservice is verified with the following VLM models:

  • cosmos-reason1: Cosmos Reason1 VLM model

  • cosmos-reason2: Cosmos Reason2 VLM model

  • Qwen3-VL-30B-A3B-Instruct (Hugging Face): Qwen3-VL VLM model

It supports both video and text prompt as input for caption generation.

Remote VLM model endpoints are tested with NVIDIA NIM or OpenAI compliant chat/completions endpoints.

API Reference#

For complete API documentation including all endpoints, request/response schemas, and interactive examples, see the Real-Time VLM API Reference.

The API is organized into the following categories:

  • Captions: Generate VLM captions and alerts for videos and live streams

  • Files: Upload and manage video/image files

  • Live Stream: Add, list, and manage RTSP live streams

  • Models: List available VLM models

  • Health Check: Service health and readiness probes

  • Metrics: Prometheus metrics endpoint

  • Metadata: Service metadata and version information

  • NIM Compatible: OpenAI-compatible endpoints for interoperability

All endpoints are prefixed with /v1. The API is available at http://<host>:8000.

Deployment#

Prerequisites#

Validated GPUs:

The RTVI VLM Microservice has been validated and tested on the following NVIDIA GPUs:

  • NVIDIA H100

  • NVIDIA RTX PRO 6000 Blackwell

  • NVIDIA L40S

  • NVIDIA DGX SPARK

  • NVIDIA IGX Thor

  • NVIDIA AGX Thor

Software:

  • OS: Ubuntu 24.04 or compatible Linux distribution (x86); DGX OS 7.4.0 (DGX Spark); Jetson Linux BSP Rel 38.4/38.5 (Jetson Thor)

  • Docker: Version 28.2+

  • Docker Compose: Version 2.36+

  • NVIDIA Driver: 580+

  • NVIDIA Container Toolkit: Latest version

  • Git LFS: For large file handling

Quick Start#

Prepare a folder where all the following files and scripts will be created.

  1. Create the compose.yaml file.

Sample docker-compose.yml file#

version: '3.8'
services:
  rtvi-vlm:
    image: ${RTVI_IMAGE:-nvcr.io/nvidia/vss-core/vss-rt-vlm:3.1.0}
    shm_size: '16gb'
    runtime: nvidia
    user: "1001:1001"
    ports:
      - "${BACKEND_PORT?}:8000"
    volumes:
      - "${ASSET_STORAGE_DIR:-/dummy}${ASSET_STORAGE_DIR:+:/tmp/assets}"
      - "${MODEL_ROOT_DIR:-/dummy}${MODEL_ROOT_DIR:+:${MODEL_ROOT_DIR:-}}"
      - "${NGC_MODEL_CACHE:-rtvi-ngc-model-cache}:/opt/nvidia/rtvi/.rtvi/ngc_model_cache"
      - "${RTVI_LOG_DIR:-/dummy}${RTVI_LOG_DIR:+:/opt/nvidia/rtvi/log/rtvi/}"
      - "${RTVI_HF_CACHE:-rtvi-hf-cache}:/tmp/huggingface"
    environment:
      AZURE_OPENA I_API_KEY: "${AZURE_OPENAI_API_KEY:-}"
      AZURE_OPENAI_ENDPOINT: "${AZURE_OPENAI_ENDPOINT:-}"
      MODEL_PATH: "${MODEL_PATH:-}"
      MODEL_IMPLEMENTATION_PATH: "${MODEL_IMPLEMENTATION_PATH:-}"
      NGC_API_KEY: "${NGC_API_KEY:-}"
      HF_TOKEN: "${HF_TOKEN:-}"
      NV_LLMG_CLIENT_ID: "${NV_LLMG_CLIENT_ID:-}"
      NV_LLMG_CLIENT_SECRET: "${NV_LLMG_CLIENT_SECRET:-}"
      NVIDIA_API_KEY: "${NVIDIA_API_KEY:-NOAPIKEYSET}"
      NVIDIA_VISIBLE_DEVICES: "${NVIDIA_VISIBLE_DEVICES:-all}"
      OPENAI_API_KEY: "${OPENAI_API_KEY:-NOAPIKEYSET}"
      OPENAI_API_VERSION: "${OPENAI_API_VERSION:-}"
      VIA_VLM_OPENAI_MODEL_DEPLOYMENT_NAME: "${VIA_VLM_OPENAI_MODEL_DEPLOYMENT_NAME:-}"
      VIA_VLM_ENDPOINT: "${VIA_VLM_ENDPOINT:-}"
      VIA_VLM_API_KEY: "${VIA_VLM_API_KEY:-}"
      VLM_BATCH_SIZE: "${VLM_BATCH_SIZE:-}"
      VLM_MODEL_TO_USE: "${VLM_MODEL_TO_USE:-openai-compat}"
      NUM_VLM_PROCS: "${NUM_VLM_PROCS:-}"
      NUM_GPUS: "${NUM_GPUS:-}"
      VLM_INPUT_WIDTH: "${VLM_INPUT_WIDTH:-}"
      VLM_INPUT_HEIGHT: "${VLM_INPUT_HEIGHT:-}"
      ENABLE_DENSE_CAPTION: "${ENABLE_DENSE_CAPTION:-}"
      ENABLE_AUDIO: "${ENABLE_AUDIO:-false}"
      INSTALL_PROPRIETARY_CODECS: "${INSTALL_PROPRIETARY_CODECS:-false}"
      FORCE_SW_AV1_DECODER: "${FORCE_SW_AV1_DECODER:-}"
      RIVA_ASR_SERVER_URI: "${RIVA_ASR_SERVER_URI:-parakeet-ctc-asr}"
      RIVA_ASR_SERVER_IS_NIM: "${RIVA_ASR_SERVER_IS_NIM:-true}"
      RIVA_ASR_SERVER_USE_SSL: "${RIVA_ASR_SERVER_USE_SSL:-false}"
      RIVA_ASR_SERVER_API_KEY: "${RIVA_ASR_SERVER_API_KEY:-}"
      RIVA_ASR_SERVER_FUNC_ID: "${RIVA_ASR_SERVER_FUNC_ID:-}"
      RIVA_ASR_GRPC_PORT: "${RIVA_ASR_GRPC_PORT:-50051}"
      RIVA_ASR_HTTP_PORT: "${RIVA_ASR_HTTP_PORT:-}"
      ENABLE_RIVA_SERVER_READINESS_CHECK: "${ENABLE_RIVA_SERVER_READINESS_CHECK:-}"
      RIVA_ASR_MODEL_NAME: "${RIVA_ASR_MODEL_NAME:-}"
      LOG_LEVEL: "${LOG_LEVEL:-}"
      RTVI_EXTRA_ARGS: "${RTVI_EXTRA_ARGS:-}"
      RTVI_RTSP_LATENCY: "${RTVI_RTSP_LATENCY:-}"
      RTVI_RTSP_TIMEOUT: "${RTVI_RTSP_TIMEOUT:-}"
      RTVI_RTSP_RECONNECTION_INTERVAL: "${RTVI_RTSP_RECONNECTION_INTERVAL:-5}"
      RTVI_RTSP_RECONNECTION_WINDOW: "${RTVI_RTSP_RECONNECTION_WINDOW:-60}"
      RTVI_RTSP_RECONNECTION_MAX_ATTEMPTS: "${RTVI_RTSP_RECONNECTION_MAX_ATTEMPTS:-10}"
      VSS_CACHE_VIDEO_EMBEDS: "${VSS_CACHE_VIDEO_EMBEDS:-false}"
      VLM_DEFAULT_NUM_FRAMES_PER_SECOND_OR_FIXED_FRAMES_CHUNK: "${VLM_DEFAULT_NUM_FRAMES_PER_SECOND_OR_FIXED_FRAMES_CHUNK:-}"
      VSS_NUM_GPUS_PER_VLM_PROC: "${VSS_NUM_GPUS_PER_VLM_PROC:-}"
      VLM_SYSTEM_PROMPT: "${VLM_SYSTEM_PROMPT:-}"
      # OpenTelemetry Configuration (Full VIA Engine)
      ENABLE_VIA_HEALTH_EVAL: "${ENABLE_VIA_HEALTH_EVAL:-false}"
      # Standard OTEL environment variables (see https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/)
      ENABLE_OTEL_MONITORING: "${ENABLE_OTEL_MONITORING:-false}"  # Set to 'true' to enable OpenTelemetry
      OTEL_RESOURCE_ATTRIBUTES: "${OTEL_RESOURCE_ATTRIBUTES:-}"
      OTEL_TRACES_EXPORTER: "${OTEL_TRACES_EXPORTER:-otlp}"
      OTEL_EXPORTER_OTLP_ENDPOINT: "${OTEL_EXPORTER_OTLP_ENDPOINT:-http://otel-collector:4318}"
      OTEL_METRIC_EXPORT_INTERVAL: "${OTEL_METRIC_EXPORT_INTERVAL:-60000}"  # Metrics export interval in milliseconds
      KAFKA_ENABLED: "${KAFKA_ENABLED:-true}"
      KAFKA_TOPIC: "${KAFKA_TOPIC:-vision-llm-messages}"
      KAFKA_INCIDENT_TOPIC: "${KAFKA_INCIDENT_TOPIC:-vision-llm-events-incidents}"
      KAFKA_BOOTSTRAP_SERVERS: "${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092}"
      KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
      ERROR_MESSAGE_TOPIC: "${ERROR_MESSAGE_TOPIC:-vision-llm-errors}"
      ENABLE_REDIS_ERROR_MESSAGES: "${ENABLE_REDIS_ERROR_MESSAGES:-false}"
      REDIS_HOST: "${REDIS_HOST:-redis}"
      REDIS_PORT: "${REDIS_PORT:-6379}"
      REDIS_DB: "${REDIS_DB:-0}"
      REDIS_PASSWORD: "${REDIS_PASSWORD:-}"
      ENABLE_REQUEST_PROFILING: "${ENABLE_REQUEST_PROFILING:-false}"
      VLLM_GPU_MEMORY_UTILIZATION: "${VLLM_GPU_MEMORY_UTILIZATION:-}"
      VLLM_IGNORE_EOS: "${VLLM_IGNORE_EOS:-false}"
      VSS_SKIP_INPUT_MEDIA_VERIFICATION: "${VSS_SKIP_INPUT_MEDIA_VERIFICATION:-}"

    ulimits:
      memlock:
        soft: -1
        hard: -1
      stack: 67108864
    ipc: host
    stdin_open: true
    tty: true
    extra_hosts:
      host.docker.internal: host-gateway
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/v1/ready"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 300s

volumes:
  rtvi-hf-cache:
  rtvi-ngc-model-cache:

Note

Platform-specific images:

  • For DGX Spark, use the SBSA image by setting RTVI_IMAGE in your .env file or override the image line:

    image: ${RTVI_IMAGE:-nvcr.io/nvidia/vss-core/vss-rt-vlm:3.1.0-sbsa}
    
  • For Jetson Thor, the default image is a multiarch image that supports ARM64.

  1. Create a .env file with your configuration:

cat > .env << EOF
BACKEND_PORT=<service port>
RTVI_IMAGE=<rtvi_vlm_container_image>
MODE=release
VLM_MODEL_TO_USE=cosmos-reason2
MODEL_PATH=ngc:nim/nvidia/cosmos-reason2-8b:1208-fp8-static-kv8
#MODEL_PATH=ngc:nim/nvidia/cosmos-reason1-7b:1.1-fp8-dynamic
#HF_TOKEN=<huggingface_token>
#MODEL_PATH=git:https://user:$HF_TOKEN@huggingface.co/nvidia/Cosmos-Reason2-2B
#MODEL_PATH=git:https://user:$HF_TOKEN@huggingface.co/nvidia/Cosmos-Reason2-8B
KAFKA_ENABLED=true
KAFKA_BOOTSTRAP_SERVERS=<Kafka_server_ip:port>
KAFKA_TOPIC=<kafka_topic>
KAFKA_INCIDENT_TOPIC=<kafka_incident_topic>
NGC_API_KEY=nvapi-XXXXXX
VLM_BATCH_SIZE=128
NVIDIA_VISIBLE_DEVICES=0
EOF
  1. Start the service:

ls -la  # Check compose.yaml, .env present
docker compose up

Note

Kafka and Redis servers can be launched as part of this deployment. To use a different Kafka server, set KAFKA_BOOTSTRAP_SERVERS appropriately. To use a different Redis server, set REDIS_HOST appropriately.

If the launch fails due to Out of Memory error, see Troubleshooting (adjust VLLM_GPU_MEMORY_UTILIZATION, reduce VLM_MAX_MODEL_LEN, or set NVIDIA_VISIBLE_DEVICES=<gpuid>).

The APIs can then be accessed and viewed in the browser at http://<HOST_IP>:<backend_port>/docs

Required Packages for Usage Examples#

The Python examples require the following packages. Install with:

pip install requests sseclient-py
  • requests – HTTP client for all examples

  • sseclient-py – Server-Sent Events client (required only for streaming examples)

The RTVI CLI client uses additional packages (tabulate, tqdm, pyyaml) for enhanced output formatting; the examples below are self-contained and need only requests and sseclient-py.

Usage Examples#

Note

Discovering the loaded model ID: The examples below use cosmos-reason2 as the model field. When running a locally deployed model, the service loads it under its full NGC artifact ID (e.g., nim_nvidia_cosmos-reason2-8b_1208-fp8-static-kv8). Use GET /v1/models to retrieve the exact ID before making inference requests:

import requests
response = requests.get("http://localhost:8000/v1/models")
model_id = response.json()["data"][0]["id"]
print(f"Loaded model: {model_id}")  # use this as the "model" field

Replace cosmos-reason2 in the examples below with the returned id.

Dense Captioning Example#

The following example demonstrates dense captioning for a stored video file. A video is uploaded, split into temporal chunks based on chunk_duration and chunk_overlap_duration, and each chunk is processed by the VLM.

Workflow: For a video of duration D seconds with chunk_duration=60 and chunk_overlap_duration=10, the video is chunked into overlapping segments (e.g., a 120-second video yields approximately 2 chunks). Each chunk is decoded, frames are sampled, and the VLM generates a caption. With stream=False (default), all chunks are processed and returned together in a single JSON response.

import requests
import json

BASE_URL = "http://localhost:8000/v1"

# Step 1: Upload a video file
with open("video.mp4", "rb") as f:
    response = requests.post(
        f"{BASE_URL}/files",
        files={"file": f},
        data={
            "purpose": "vision",
            "media_type": "video",
            "creation_time": "2024-06-09T18:32:11.123Z"
        }
    )
response.raise_for_status()
file_info = response.json()
file_id = file_info["id"]
print(f"Uploaded file: {file_id}")

# Step 2: Generate captions (chunks returned in full response)
caption_request = {
    "id": file_id,
    "prompt": "Describe what is happening in this video",
    "model": "cosmos-reason2",
    "chunk_duration": 60,
    "chunk_overlap_duration": 10,
    "enable_audio": False
}
response = requests.post(
    f"{BASE_URL}/generate_captions_alerts",
    json=caption_request
)
response.raise_for_status()
captions = response.json()
print(f"Generated {len(captions['chunk_responses'])} caption chunks")

# Step 3: Process results
for chunk in captions["chunk_responses"]:
    print(f"[{chunk['start_time']} - {chunk['end_time']}]: {chunk['content']}")

# Step 4: Clean up
requests.delete(f"{BASE_URL}/files/{file_id}")
print("File deleted")

Dense Captioning (Streaming) Example#

The following example demonstrates dense captioning for a live RTSP stream with streaming output. As the stream is decoded in real time, it is split into chunks (e.g., 60-second segments with 10-second overlap). Each chunk is processed by the VLM as it becomes available.

Workflow: With stream=True, chunks are returned incrementally via Server-Sent Events (SSE). Each SSE event contains chunk_responses for chunks that have completed VLM inference. The client receives captions as they are generated instead of waiting for the entire stream to finish. The stream terminates with a [DONE] message.

import requests
import json
import sseclient

BASE_URL = "http://localhost:8000/v1"

# Step 1: Add live stream
stream_request = {
    "streams": [{
        "liveStreamUrl": "rtsp://example.com/stream",
        "description": "Main warehouse camera"
    }]
}
response = requests.post(
    f"{BASE_URL}/streams/add",
    json=stream_request
)
response.raise_for_status()
stream_info = response.json()
if stream_info.get("errors"):
    raise RuntimeError(f"Failed to add stream: {stream_info['errors']}")
stream_id = stream_info["results"][0]["id"]
print(f"Added stream: {stream_id}")

# Step 2: Start caption generation with streaming
caption_request = {
    "id": stream_id,
    "prompt": "Describe what is happening in this live stream",
    "model": "cosmos-reason2",
    "stream": True,
    "chunk_duration": 60,
    "chunk_overlap_duration": 10
}
response = requests.post(
    f"{BASE_URL}/generate_captions_alerts",
    json=caption_request,
    stream=True
)
response.raise_for_status()

# Step 3: Process streaming responses (chunks as they complete)
client = sseclient.SSEClient(response)
for event in client.events():
    data = event.data.strip() if event.data else ""
    if data == "[DONE]":
        break
    if not data:
        continue
    result = json.loads(data)
    if "chunk_responses" in result:
        for chunk in result["chunk_responses"]:
            print(f"[{chunk['start_time']}]: {chunk['content']}")

# Step 4: Stop processing and remove stream
requests.delete(f"{BASE_URL}/generate_captions_alerts/{stream_id}")
requests.delete(f"{BASE_URL}/streams/delete/{stream_id}")
print("Stream removed")

Listing Live Streams#

Use GET /v1/streams/get-stream-info to list all currently registered live streams. This is useful for verifying that a stream was added successfully or for retrieving the stream ID of an existing stream.

import requests

BASE_URL = "http://localhost:8000/v1"

response = requests.get(f"{BASE_URL}/streams/get-stream-info")
response.raise_for_status()
streams = response.json()  # list of LiveStreamInfo objects
for stream in streams:
    print(f"ID: {stream['id']}, URL: {stream['liveStreamUrl']}")

VLM Alert Example#

The following example demonstrates alert/anomaly detection using the VLM. Use a prompt that expects a structured Yes/No response for anomaly detection. When anomalies are detected, incidents can be published to the Kafka incident topic (see Kafka Integration). The Incident message payload is defined in Incident Messages.

import requests

BASE_URL = "http://localhost:8000/v1"

# Upload video or use existing stream ID
file_id = "your-file-id"  # or stream_id for live streams

caption_request = {
    "id": file_id,
    "prompt": (
        "You are a warehouse monitoring system focused on safety and "
        "efficiency. Analyze the situation to detect any anomalies such as "
        "workers not wearing safety gear, leaving items unattended, or "
        "wasting time. Respond in the following structured format:\n"
        "Anomaly Detected: Yes/No\n"
        "Reason: [Brief explanation]"
    ),
    "system_prompt": "Answer the user's question correctly in yes or no",
    "model": "cosmos-reason2",
    "chunk_duration": 60,
    "chunk_overlap_duration": 10
}
response = requests.post(
    f"{BASE_URL}/generate_captions_alerts",
    json=caption_request
)
captions = response.json()

for chunk in captions["chunk_responses"]:
    content = chunk.get("content", "")
    if "Anomaly Detected: Yes" in content:
        print(f"ALERT [{chunk['start_time']} - {chunk['end_time']}]: {content}")

Chat Completions API Example#

The OpenAI-compatible /v1/chat/completions endpoint supports three input modes for video/image: pre-uploaded file ID, HTTP/HTTPS URL, and base64 data URL.

1. Using pre-uploaded file ID (id field):

import requests

BASE_URL = "http://localhost:8000/v1"
file_id = "your-uploaded-file-uuid"  # From POST /v1/files

response = requests.post(
    f"{BASE_URL}/chat/completions",
    json={
        "model": "cosmos-reason2",
        "id": file_id,
        "messages": [{"role": "user", "content": "Describe what is happening in this video."}],
    },
)
result = response.json()
print(result["choices"][0]["message"]["content"])

2. Using HTTP URL (image_url or video_url in message content):

import requests

BASE_URL = "http://localhost:8000/v1"

response = requests.post(
    f"{BASE_URL}/chat/completions",
    json={
        "model": "cosmos-reason2",
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "What is in this image?"},
                    {
                        "type": "image_url",
                        "image_url": {"url": "https://example.com/image.png"},
                    },
                ],
            }
        ],
    },
)
result = response.json()
print(result["choices"][0]["message"]["content"])

# For video, use "video_url" instead:
# {"type": "video_url", "video_url": {"url": "https://example.com/video.mp4"}}

3. Using base64 data URL (inline media in message content):

import requests
import base64

BASE_URL = "http://localhost:8000/v1"

# Read and encode image as base64
with open("image.png", "rb") as f:
    b64_data = base64.b64encode(f.read()).decode("utf-8")

data_url = f"data:image/png;base64,{b64_data}"

response = requests.post(
    f"{BASE_URL}/chat/completions",
    json={
        "model": "cosmos-reason2",
        "messages": [
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "Describe this image."},
                    {"type": "image_url", "image_url": {"url": data_url}},
                ],
            }
        ],
    },
)
result = response.json()
print(result["choices"][0]["message"]["content"])

# For video: data:video/mp4;base64,<base64_string>

Implementation Details#

Server Class#

The RT VLM Server implements the FastAPI application and manages all API endpoints.

Timestamp Handling#

The server handles timestamps differently based on media type, for live stream uses NTP timestamps (ISO 8601 format).

Streaming Implementation#

For streaming responses, the server uses Server-Sent Events (SSE):

  • Events are sent as JSON objects

  • Each event contains chunk responses as they become available

  • Final event contains usage statistics (if requested)

  • Stream terminates with [DONE] message

  • Only one client can connect to a live stream at a time

Kafka Integration#

The RTVI VLM Server can publish messages to Kafka topics for downstream processing. This enables integration with other microservices, analytics pipelines, and real-time alerting systems.

Kafka Topics#

The server publishes to the following Kafka topics:

  • VisionLLM Messages (default: vision-llm-messages): Contains VisionLLM protobuf messages with VLM caption results

  • Incidents (default: vision-llm-events-incidents): Contains Incident protobuf messages when anomalies or incidents are detected

Configuration:

Kafka integration is controlled by the following environment variables:

  • KAFKA_ENABLED: Enable/disable Kafka integration (true / false). Default: false

  • KAFKA_BOOTSTRAP_SERVERS: Comma-separated list of Kafka broker addresses (e.g., localhost:9092 or kafka:9092 for Docker)

  • KAFKA_TOPIC: Topic for VisionLLM messages. Default: vision-llm-messages

  • KAFKA_INCIDENT_TOPIC: Topic for incident messages. Default: vision-llm-events-incidents

How Alerts and Incidents Are Sent to Kafka#

When generate_captions_alerts processes video chunks, the server publishes messages to Kafka as each chunk completes VLM inference:

  1. VisionLLM messages are always sent to KAFKA_TOPIC (vision-llm-messages). Each message contains the VLM caption, frame metadata, sensor info, and an incidentDetected flag in the info map ("true" or "false").

  2. Incident messages are sent to KAFKA_INCIDENT_TOPIC (vision-llm-events-incidents) only when an anomaly is detected. The server detects incidents by checking the VLM response for trigger phrases such as "yes" or "true" (case-insensitive). Use prompts that expect structured Yes/No answers (see Enabling Incidents). For the full Incident message payload (protobuf schema and example), see Incident Messages.

  3. Publishing flow (per chunk):

    • After VLM inference completes, the server builds a VisionLLM protobuf and optionally an Incident protobuf if the response triggers an alert.

    • Each message is sent asynchronously with a key {request_id}:{chunk_idx} for partitioning and ordering.

    • A message_type header ("vision_llm" or "incident") identifies the payload type for consumers.

    • VisionLLM and Incident messages are published independently; an incident is sent only when the VLM output indicates an anomaly.

  4. Requirement: Set KAFKA_ENABLED=true and configure KAFKA_BOOTSTRAP_SERVERS for Kafka publishing to occur.

Message Formats#

Incident Messages#

Incident messages are serialized as Protocol Buffer (protobuf) messages using the Incident message type from the protobuf schema.

Message Header:

  • message_type: "incident"

Message Structure:

message Incident {
  string sensorId = 1;
  google.protobuf.Timestamp timestamp = 2;
  google.protobuf.Timestamp end = 3;
  repeated string objectIds = 4;
  repeated string frameIds = 5;
  Place place = 6;
  AnalyticsModule analyticsModule = 7;
  string category = 8;
  repeated Embedding embeddings = 9;
  bool isAnomaly = 10;
  LLM llm = 12;
  map<string, string> info = 11;
}

Key Fields:

  • sensorId: Identifier of the sensor/stream

  • timestamp: Start timestamp of the incident

  • end: End timestamp of the incident

  • objectIds: Array of object IDs involved in the incident

  • category: Category of the incident (e.g., "safety_non_compliance")

  • isAnomaly: Boolean indicating if the incident is an anomaly

  • llm: LLM query and response information

  • info: Additional metadata map containing fields like: * request_id: Request ID associated with the incident * chunk_idx: Chunk index where the incident was detected * incident_detected: Alert flag * priority: Priority level (e.g., "high")

Example Incident JSON (for reference):

{
  "sensorId": "camera-entrance-east-01",
  "timestamp": "2025-11-19T06:22:20Z",
  "end": "2025-11-19T06:22:32Z",
  "objectIds": [],
  "frameIds": ["frame-10512", "frame-10518"],
  "place": {
    "id": "dock-entrance-east",
    "name": "Dock Entrance - East",
    "type": "warehouse-bay"
  },
  "analyticsModule": {
    "id": "inc-activity-detector",
    "description": "Forklift safety compliance detector",
    "source": "VLM",
    "version": "2.0.0"
  },
  "category": "safety_non_compliance",
  "isAnomaly": true,
  "info": {
    "priority": "high",
    "request_id": "req_1234567890",
    "chunk_idx": "5"
  },
  "llm": {
    "queries": [{
      "response": "Operator entered the high-risk loading area without a high-visibility vest while a forklift was active."
    }]
  }
}

VisionLLM Messages#

VisionLLM messages contain VLM caption results and are serialized as Protocol Buffer messages using the VisionLLM message type.

Message Header:

  • message_type: "vision_llm" (default, if not specified)

Message Structure:

See the protobuf schema documentation for complete VisionLLM message structure. Key fields include:

  • version: Message version

  • timestamp: Start timestamp

  • end: End timestamp

  • startFrameId: Start frame identifier

  • endFrameId: End frame identifier

  • sensor: Sensor information

  • llm: LLM queries, responses, and embeddings

  • info: Additional metadata map

Redis Error Messages#

By default, error messages are sent to Redis. To use Redis for error messages, set the following environment variables in your .env file:

ENABLE_REDIS_ERROR_MESSAGES=true
REDIS_HOST=redis.example.com
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=your_password  # Optional, only if Redis requires authentication
ERROR_MESSAGE_TOPIC=vision-llm-errors  # Redis channel name for error messages

Error messages will be published to the Redis channel specified in ERROR_MESSAGE_TOPIC. The message format remains JSON with the following fields: streamId, timestamp, type, source, event.

Using Remote Endpoints#

The RTVI VLM Microservice supports using remote endpoints with NVIDIA NIM or OpenAI-compatible models:

NVIDIA NIM:

VLM_MODEL_TO_USE=openai-compat
OPENAI_API_KEY=nvapi-XXXXXXX
VIA_VLM_ENDPOINT="https://integrate.api.nvidia.com/v1"
VIA_VLM_OPENAI_MODEL_DEPLOYMENT_NAME="nvidia/nemotron-nano-12b-v2-vl"

For local deployments, update VIA_VLM_ENDPOINT to point to your local deployment.

GPT-4o:

OPENAI_API_KEY=<openai key>
VLM_MODEL_TO_USE=openai-compat
VIA_VLM_OPENAI_MODEL_DEPLOYMENT_NAME="gpt-4o"

Hugging Face Models Locally#

For models downloaded from Hugging Face and served locally via vLLM:

Qwen3-VL:

If HF_TOKEN is already set:

VLM_MODEL_TO_USE=vllm-compatible
MODEL_PATH=git:https://huggingface.co/Qwen/Qwen3-VL-30B-A3B-Instruct

If HF_TOKEN is not set, embed the token directly in the URL:

VLM_MODEL_TO_USE=vllm-compatible
MODEL_PATH=git:https://user:<huggingface_token>@huggingface.co/Qwen/Qwen3-VL-30B-A3B-Instruct

Environment Variables Reference#

Core Configuration#

Variable

Description

Default

BACKEND_PORT

Port for REST API server

8000

LOG_LEVEL

Logging verbosity

INFO

Model Configuration#

Variable

Description

Default

VLM_BATCH_SIZE

Inference batch size

Auto-calculated

NUM_VLM_PROCS

Number of inference processes

10

NUM_GPUS

Number of GPUs to use

Auto-detected

NVIDIA_VISIBLE_DEVICES

GPU device IDs

all

MODEL_PATH

Path of the model

ngc:nim/nvidia/cosmos-reason2-8b:1208-fp8-static-kv8

VLM_MODEL_TO_USE

Name of the model

cosmos-reason2

VIA_VLM_OPENAI_MODEL_DEPLOYMENT_NAME

Name of OpenAI compatible model

VIA_VLM_ENDPOINT

Link of the endpoint for OpenAI compatible model

OPENAI_API_KEY

Key to access OpenAI model

VLM_INPUT_WIDTH

Input video frame width for VLM

Auto-calculated

VLM_INPUT_HEIGHT

Input video frame height for VLM

Auto-calculated

VLM_SYSTEM_PROMPT

Custom system prompt for the model

VLM_MAX_MODEL_LEN

Maximum model context length (tokens)

128000

VLM_DEFAULT_NUM_FRAMES_PER_SECOND_OR_FIXED_FRAMES_CHUNK

Frame sampling rate (FPS) or fixed frame count per chunk

Auto-calculated

VSS_NUM_GPUS_PER_VLM_PROC

Number of GPUs per VLM inference process

Storage and Caching#

Variable

Description

Default

ASSET_STORAGE_DIR

Host path for uploaded files

MAX_ASSET_STORAGE_SIZE_GB

Max storage size (GB)

RTVI_LOG_DIR

Log output directory

Feature Toggles#

Variable

Description

Default

ENABLE_NSYS_PROFILER

Enable NSYS profiling

false

INSTALL_PROPRIETARY_CODECS

Install additional codecs

false

FORCE_SW_AV1_DECODER

Force software AV1 decode

false

VSS_SKIP_INPUT_MEDIA_VERIFICATION

Skip input media verification (faster stream addition; use for benchmarking)

RTSP Streaming#

Variable

Description

Default

RTVI_RTSP_LATENCY

RTSP latency (ms)

2000

RTVI_RTSP_TIMEOUT

RTSP timeout (ms)

2000

RTVI_RTSP_RECONNECTION_INTERVAL

Time to detect stream interruption and wait for reconnection (seconds)

5.0

RTVI_RTSP_RECONNECTION_WINDOW

Duration to attempt reconnection after interruption (seconds)

60.0

RTVI_RTSP_RECONNECTION_MAX_ATTEMPTS

Max reconnection attempts

10

RTVI_ADD_TIMESTAMP_TO_VLM_PROMPT

Add timestamp information to VLM prompts

true

VLLM Runtime Configuration#

Variable

Description

Default

VLLM_GPU_MEMORY_UTILIZATION

GPU memory utilization fraction for vLLM (e.g., 0.9 for 90%)

0.7

VLLM_MAX_NUM_SEQS

Maximum number of sequences in vLLM batch

1024

VLLM_IGNORE_EOS

Ignore end-of-sequence tokens (useful for token generation benchmarks)

false

VLLM_ENABLE_PREFIX_CACHING

Enable vLLM prefix caching for improved performance

true

VLLM_DISABLE_MM_PREPROCESSOR_CACHE

Disable multimodal preprocessor cache

false

OpenTelemetry / Monitoring#

Variable

Description

Default

ENABLE_OTEL_MONITORING

Enable OpenTelemetry monitoring

false

OTEL_SERVICE_NAME

Service name for traces

rtvi

OTEL_EXPORTER_OTLP_ENDPOINT

OTLP endpoint

http://otel-collector:4318

OTEL_TRACES_EXPORTER

Traces exporter type

otlp

OTEL_METRIC_EXPORT_INTERVAL

Metrics export interval (ms)

60000

ENABLE_VIA_HEALTH_EVAL

Enable health evaluation

false

ENABLE_REQUEST_PROFILING

Enable per-request profiling

false

Kafka Configuration#

Variable

Description

Default

KAFKA_ENABLED

Enable Kafka integration

true

KAFKA_BOOTSTRAP_SERVERS

Kafka broker address

localhost:9092

KAFKA_TOPIC

Topic for VisionLLM/embedding messages

vision-llm-messages

KAFKA_INCIDENT_TOPIC

Topic for incident messages

vision-llm-events-incidents

ERROR_MESSAGE_TOPIC

Topic/channel for error messages

vision-llm-errors

Redis Error Messages Configuration#

Variable

Description

Default

ENABLE_REDIS_ERROR_MESSAGES

Enable Redis for error messages instead of Kafka

false

REDIS_HOST

Redis server hostname

redis

REDIS_PORT

Redis server port

6379

REDIS_DB

Redis database number

0

REDIS_PASSWORD

Redis authentication password

Frame Selection Modes#

RTVI VLM supports two frame selection modes for sampling frames from video chunks:

FPS-based Selection:

  • Enable --use-fps-for-chunking flag

  • Set --num-frames-per-second-or-fixed-frames-chunk to the desired frames per second (e.g., 0.05 for 0.05 FPS)

  • The system will sample frames at the specified rate based on chunk duration

Fixed Frame Selection (default):

  • Do not set --use-fps-for-chunking flag (disabled by default)

  • Set --num-frames-per-second-or-fixed-frames-chunk to the desired number of frames per chunk (e.g., 8 for 8 frames)

  • The system will sample a fixed number of equally-spaced frames from each chunk

Enabling Incidents#

To enable incident detection, set appropriate --prompt or --system-prompt with clear Yes or No expectation. Incidents will be pushed to the incident Kafka topic.

Example prompt:

--prompt "You are a warehouse monitoring system focused on safety and
efficiency. Analyze the situation to detect any anomalies such as workers
not wearing safety gear, leaving items unattended, or wasting time.
Respond in the following structured format:
Anomaly Detected: Yes/No
Reason: [Brief explanation]"
--system-prompt "Answer the user's question correctly in yes or no"

Troubleshooting#

Common Issues#

Container fails to start

  • Check docker logs <container_name> for error messages

  • Verify GPU access: Ensure NVIDIA Container Toolkit is installed and nvidia-smi works

Out of Memory error

  • Set NVIDIA_VISIBLE_DEVICES=<gpuid> to a free GPU

  • Reduce batch size: Try VLM_BATCH_SIZE=32 Default is auto-calculated

  • Increase GPU memory utilization for GPUs with less than 50GB of memory: Try VLLM_GPU_MEMORY_UTILIZATION=0.85 Default is 0.7

  • Reduce max sequences: Lower VLLM_MAX_NUM_SEQS

  • Reduce concurrent processes: Lower NUM_VLM_PROCS

  • Reduce max model length: Set VLM_MAX_MODEL_LEN=64000

  • On Jetson Thor or DGX systems, ensure that the cache cleaner script is running.

Port conflicts

  • Change BACKEND_PORT if port 8000 is already in use

Kafka Connection Issues

  • Use kafka:9092 as bootstrap server when connecting from within Docker network

  • Verify Kafka is running: docker ps | grep kafka

Health Check Failures

  • Check logs with docker compose logs rtvi-vlm

Known Issues#

High concurrency with 8K vision tokens on Jetson Thor / DGX Spark

When running more than 2 concurrent live streams with a high vision-token budget (~8K vision tokens, i.e., 448×448 at 80 frames with Cosmos Reason2), the device may reboot or the microservice may crash due to memory pressure on resource-constrained edge platforms.

  • Recommendation: Limit concurrent live streams to 2 or fewer when using 8K vision tokens on Jetson Thor or DGX Spark.

  • Alternatively, reduce the vision-token budget by lowering the input resolution from 448×448 to 372×372 at 30 frames (~2K vision tokens) using VLM_INPUT_WIDTH=372, VLM_INPUT_HEIGHT=372, VLM_DEFAULT_NUM_FRAMES_PER_SECOND_OR_FIXED_FRAMES_CHUNK=30 to support higher concurrency.

Stream deletion latency under high concurrent load

When multiple live streams are being processed concurrently and the VLM inference latency exceeds the chunk duration (e.g., processing takes longer than the configured chunk_duration), deleting a stream via DELETE /v1/streams/delete/{id} may take longer than expected. The delete operation waits for the in-flight VLM inference request to complete before the stream resources are released.

  • This is expected behavior — the server ensures the current inference cycle completes cleanly before tearing down the stream.

  • The delay is proportional to the VLM inference time at the current load. Reducing concurrency or increasing chunk_duration will reduce delete latency.

Version Information#

The API version is v1. Check the service version using the health check endpoints or by examining the OpenAPI schema at /docs.

API Reference