Introduction to Flow APIs#

Flow APIs effectively abstract away the underlying pipeline details, allowing developers to focus solely on the goals of their specific tasks in a pythonic style. These high-level APIs emphasize “what to do” rather than “how to do it,” enabling developers to express their intentions in a more intuitive and concise manner. This abstraction simplifies the development process and improves code readability and maintainability.

Besides the explicitly declared arguments, Flow APIs also allow veterans to override standard Deepstream Element properties through kwargs, and the only trick is all the ‘-’ in a property name must be replaced with ‘_’.

For instance, users can specify the GPU they want to use when they create a flow for capturing:

# Create a capture flow on gpu 1
flow = Flow(Pipeline('caputure')).capture([video_file], gpu_id=1)

Within the service maker a wide range of common operations in the multimedia and deep learning fields are supported by Flow.

Capture#

The ‘capture’ method appends a capture operation to an empty flow. This operation takes a list of URIs as input, from which multiple streams carrying decoded data will be created within the flow. Alternatively, a SourceManager object can be provided to add/remove sources during runtime.

argument name

description

inputs

list of input URIs

source_manager

Source manager object of adding/removing sources during runtime

kwargs

optional keyword arguments for overriding the standard Deepstream Element properties

# Flow for playback a mp4 video
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render()()

Inject#

The ‘inject’ method appends an injection operation to an empty flow. The operation takes a list of BufferProvider objects and creates multiple streams thereafter.

In addition to overriding the ‘generate’ method to create buffers, BufferProvider objects must carry the following members to instruct the injection process:

member name

description

width

width of the video frames

height

height of the video frames

format

format: RGB/I420

framerate

framerate of the video

use_gpu

boolean value to indicate if the data is in GPU memory

from pyservicemaker import Pipeline, Flow, BufferProvider, Buffer

class MyBufferProvider(BufferProvider):

    def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
        super().__init__()
        self.width = width
        self.height = height
        self.format = format
        self.framerate = framerate
        self.device = device
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = [self.count]*(self.width*self.height*3)
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

p = MyBufferProvider(320, 240)
# playback a mp4 video
Flow(Pipeline("playback")).inject([p]).render()()

Retrieve#

The ‘retrieve’ method appends a data retriever to the current flow. The operation takes a instance of BufferRetriever which implements the ‘consume’ method to access buffers. Invocation of the method result in the end of a flow and no more operation can be appended.

# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever

class MyBufferRetriever(BufferRetriever):
    def __init__(self):
        super().__init__()
        self.frames = 0

    def consume(self, buffer):
        tensor = buffer.extract(0)
        assert len(tensor.shape) == 3
        self.frames += 1
        return 1
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()

Decode#

The ‘decode’ method appends a decoding operation to the current flow. The operation adds a decoder to each upstream. It is very useful in the case that the data is injected via buffer providers.

class JpegBufferProvider(BufferProvider):

    def __init__(self, file_path:str):
        super().__init__()
        self._file_path = file_path
        self.format = "JPEG"
        self.width = 1280
        self.height = 720
        self.framerate = 0
        self.count = 0
        self.expected = 255

    def generate(self, size):
        data = []
        with open(self._file_path, "rb") as f:
            bytes = f.read()
            data = [int(b) for b in bytes]
        if self.count < self.expected:
            self.count += 1
        return Buffer() if self.count == self.expected else Buffer(data)

# decode jpeg from a binary buffer
jpeg_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.jpg"
Flow(Pipeline("test")).inject([JpegBufferProvider(jpeg_file)]).decode().render()()

Batch#

The ‘batch’ method appends a batching operation to the current flow to combine all the streams to a single batched one. This operation takes standard keyword arguments including’batch_size’, ‘width’, and ‘height’ as parameters. If these parameters are not given, the operation sets the batch size to the number of streams and the width x height to 1920 x 1080 by default.

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback all the source in a tiled display
Flow(Pipeline("playback")).capture(uri_list).batch().render()()

Batched Capture#

The ‘batch_capture’ method appends a operation to capture to an empty flow and batches the inputs. The operation takes either a list of URIs or a source config file as input and forming a batched stream thereafter.

argument name

description

input

list of input URIs

smart_record_config

optional configuration file for enabling smart record

kwargs

optional keyword arguments for overriding the standard Deepstream Element properties

uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback 4 mp4 videos at the same time
Flow(Pipeline("playback")).batch_capture(uri_list).render()()

Render#

The ‘render’ method appends a renderer to the current flow to display or discard the video. The operation takes a render mode to decide how to render the data, DISPLAY (default) and DISCARD are supported so far. Moreover, optional named arguments cover all the standard sink control parameters. Invocation of the method result in the end of a flow and no more operation can be appended.

argument name

description

mode

render mode: DISPLAY (default), DISCARD, or STREAM

enable_osd

boolean value to enable OSD, True by default

rtsp_mount_point

the mount point for the rtsp stream, default is ‘ds-test’ and only valid when mode is STREAM

rtsp_port

the port number for the rtsp stream, default is 8554 and only valid when mode is STREAM

kwargs

optional keyword arguments for overriding the standard Deepstream Element properties

# discard the video frames
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render(mode=RenderMode.DISCARD)()

Encode#

The ‘encode’ method appends a encoder to the current flow to encode the video data into a file or rtsp stream. The operations takes a destination URI prefixed with ‘file://’ or ‘rtsp://’. If the prefix is missing, ‘file://’ is implied. In the case of RTSP stream, a port number must appear in the URI. Moreover, optional parameters for encoding control are supported too.

name

description

dest

destination URI prefixed with ‘file://’ or ‘rtsp://

use_sw_codec

boolean value to indicate if the software codec is used, False by default

profile

profile: 0 for baseline (default), 1 for constrainted baseline, 2 for main, 4 for high

iframeinterval

Encoding Intra Frame occurrence frequency, default 10

bitrate

bitrate, default 2000000

# streaming a udp stream via rtsp
pipeline = Pipeline("test")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).encode("output.mp4", sync=True)()

Infer#

The ‘infer’ method enables the inference in the current flow. The operation takes a ‘config’ parameter for the model configuration file. Optional standard nvinfer parameters can be added to override the values in the configuration file.

argument name

description

config

configuration file for nvinfer

with_triton

boolean value to indicate if the inference is through triton, False by default

kwargs

optional keyword arguments for overriding the standard Deepstream Element properties

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
# object detection using resnet18 for 4 streams
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
Flow(Pipeline("infer")).batch_capture(uri_list).infer(pgie_config).render()()

Track#

The ‘track’ method appends a tracker to the current flow for tracking the detected object. The operation must come after ‘infer’ for detection data. Standard nvtracker parameters must be appropriately set to make the tracker work correctly.

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# object detection and tracking using nvmultiobjecttracker for 4 streams
Flow(Pipeline("tracker")).batch_capture(uri_list).infer(pgie_config).track(
    ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
    ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
).render()()

Publish#

The ‘publish’ method appends a procedure to the current flow for publishing events to the remote server. The operations takes the following parameters to set up the communication between the pipeline and the remote server.

argument name

description

msg_broker_proto_lib

The low level library used by the message broker

msg_broker_conn_str

The connect string for the server

topic

topic name

msg_conv_config

The message converter config for source information

# publish the object data to a kafka server
Flow(Pipeline("publish")).batch_capture(
    "/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/deepstream_test5_app/source_list_dynamic.yaml"
).infer(
    "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).attach(
    what="add_message_meta_probe",
    name="message_generator"
).publish(
    msg_broker_proto_lib="/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so",
    msg_broker_conn_str="qvs-ds-kafka.nvidia.com;9092",
    topic="test4app",
)()

Invocation of the method result in the end of a flow and no more operation can be appended.

Preprocess#

The ‘preprocess’ method appends a preprocess operation to the current flow. The operation takes a ‘config’ parameter for the preprocess configuration file. An optional tensor generator callback can be provided to generate custom tensors alongside the preprocessed images.

argument name

description

config_file

The preprocess configuration file in YAML format

tensor_generator

The callback for generating custom tensors, which takes an integer (number of frames in the batch) as input and returns a dictionary of tensors

preprocess_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_preprocess.yml"
Flow(Pipeline("preprocess")).batch_capture(
    ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]
).preprocess(preprocess_config).infer(
    "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).render()()

Attach#

The ‘attach’ method attach a Probe to the current flow. Two parameters required:

argument name

description

what

can be a Probe object or name of the probe in a shared library

name

the instance name if probe is from shared library

Fork#

The ‘fork’ method forks the current flow so that more than one flow can be appended.

# Initiate a pipeline to read a mp4 file
# transcode the video to both a local file and via rtsp
# at the same time, do the playback
pipeline = Pipeline("test")
dest = "rtsp://localhost:8554"
flow = Flow(pipeline).capture(["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h265.mp4"]).batch().fork()
flow.encode(dest, sync=True)
flow.encode("/tmp/sample.mp4")
flow.render(sync=True)
flow()

The output RTSP stream can be received from “rtsp://localhost:8554/ds-test

Analyze#

The ‘analyze’ method appends an analytics operation to the current flow, which outputs frame level and tracked object level information on Region of Interest (RoI) Filtering, Overcrowding Detection, Direction Detection, and Line Crossing. The operation must come after ‘infer’ and ‘track’. The operation takes a ‘config’ parameter for the NvDsAnalytics configuration file.

pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
pipeline = Pipeline("analytics")
flow = Flow(pipeline).batch_capture(uri_list).infer(pgie_config)
flow = flow.track(ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml", ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so")
# Analyze the video frames and detected objects
flow = flow.analyze("/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-nvdsanalytics-test/config_nvdsanalytics.txt")
# Attach a probe to retrieve analytics info and render
flow.attach(what=Probe("counter", ObjectCounterMarker())).render()()
flow()

FlowAPI Sample Applications Reference Table#

Reference test application

Path inside service-maker/sources directory

Description

Platform

Sample test application 1

apps/python/flow_api/deepstream_test1_app

Sample of how to use flowAPI methods for a single H.264 stream inference: batch_capture -> infer -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection.

x86/aarch64

Sample test application 2

apps/python/flow_api/deepstream_test2_app

Sample of how to use flowAPI methods for a single H.264 stream cascaded inference: batch_capture -> infer (primary detector) -> track -> infer (secondary classifier) -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection and 2 classifier models (i.e., resnet18_vehiclemakenet_pruned.onnx, resnet18_vehicletypenet_pruned.onnx).

x86/aarch64

Sample test application 3

apps/python/flow_api/deepstream_test3_app

Builds on flow_api/deepstream_test1(sample test application 1) to demonstrate how to:

  • Use multiple sources in the pipeline for inference.

  • Extract the stream metadata, which contains useful information about the frames in the batched buffer.

This app uses resnet18_trafficcamnet_pruned.onnx for detection.

x86/aarch64

Sample test application 4

apps/python/flow_api/deepstream_test4_app

Builds on flow_api/deepstream_test1 for a single H.264 stream inference to demonstrate how to use publish method to publish messages to a remote server and fork method to simultaneously render the output. This app uses resnet18_trafficcamnet_pruned.onnx for detection.

x86/aarch64

Sample test application 5

apps/python/flow_api/deepstream_test5_app

Comprehensive DeepStream pipeline demonstrating Flow API capabilities with multi-stage inference and analytics. Features:

  • Primary object detection using resnet18_trafficcamnet_pruned.onnx

  • Object tracking with NvDCF tracker

  • Vehicle type classification (car, truck, bus, etc.) using resnet18_vehicletypenet_pruned.onnx

  • Vehicle make/model classification using resnet18_vehiclemakenet_pruned.onnx

  • Smart recording with event-based video capture and Kafka integration

  • Message publishing to Kafka with custom metadata

  • Performance monitoring probes (FPS, latency, OSD object counting)

  • KITTI format data export

  • RTSP streaming support

  • Multi-stream batch processing with configurable batch sizes

This app showcases advanced pipeline orchestration with parallel processing (fork) for simultaneous rendering and message publishing.

x86/aarch64

3D Action Recognition Example

apps/python/flow_api/deepstream_3d_action_recognition_app

Demonstrates 3D action recognition using Flow API with custom preprocessing. Features:

  • 3D and 2D RGB-based action recognition using TAO models

  • Custom sequence preprocessing with libnvds_custom_sequence_preprocess.so

  • Support for action classes: push, fall_floor, walk, run, ride_bike

  • Batch capture, preprocessing, inference, and rendering pipeline

  • Custom metadata handling for label display with OSD

This app uses resnet18_3d_rgb_hmdb5_32.onnx and resnet18_2d_rgb_hmdb5_32.onnx models from NGC.

x86/aarch64

Smart Record Example

apps/python/flow_api/deepstream_sr_test_app

Builds on flow_api/deepstream_test1 to demonstrate event-driven smart recording with Kafka integration. Features multi-stream batch processing, inference, and configurable recording parameters. Supports RTSP streams with Kafka message broker for automated recording based on external triggers and custom events.

x86/aarch64

Inject and Retrieve Example

apps/python/flow_api/deepstream_appsrc_test_app

Demonstrates how to create a BufferRetriever for a retrieve method. The retrieve method with a customized BufferRetriever can be used to extract buffer data from the pipeline.

x86