Introduction to Flow APIs#
Flow APIs effectively abstract away the underlying pipeline details, allowing developers to focus solely on the goals of their specific tasks in a pythonic style. These high-level APIs emphasize “what to do” rather than “how to do it,” enabling developers to express their intentions in a more intuitive and concise manner. This abstraction simplifies the development process and improves code readability and maintainability.
Besides the explicitly declared arguments, Flow APIs also allow veterans to override standard Deepstream Element properties through kwargs, and the only trick is all the ‘-’ in a property name must be replaced with ‘_’.
For instance, users can specify the GPU they want to use when they create a flow for capturing:
# Create a capture flow on gpu 1
flow = Flow(Pipeline('caputure')).capture([video_file], gpu_id=1)
Within the service maker a wide range of common operations in the multimedia and deep learning fields are supported by Flow.
Capture#
The ‘capture’ method appends a capture operation to an empty flow. This operation takes a list of URIs as input, from which multiple streams carrying decoded data will be created within the flow. Alternatively, a SourceManager object can be provided to add/remove sources during runtime.
argument name |
description |
---|---|
inputs |
list of input URIs |
source_manager |
Source manager object of adding/removing sources during runtime |
kwargs |
optional keyword arguments for overriding the standard Deepstream Element properties |
# Flow for playback a mp4 video
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render()()
Inject#
The ‘inject’ method appends an injection operation to an empty flow. The operation takes a list of BufferProvider objects and creates multiple streams thereafter.
In addition to overriding the ‘generate’ method to create buffers, BufferProvider objects must carry the following members to instruct the injection process:
member name |
description |
---|---|
width |
width of the video frames |
height |
height of the video frames |
format |
format: RGB/I420 |
framerate |
framerate of the video |
use_gpu |
boolean value to indicate if the data is in GPU memory |
from pyservicemaker import Pipeline, Flow, BufferProvider, Buffer
class MyBufferProvider(BufferProvider):
def __init__(self, width, height, device='cpu', framerate=30, format="RGB"):
super().__init__()
self.width = width
self.height = height
self.format = format
self.framerate = framerate
self.device = device
self.count = 0
self.expected = 255
def generate(self, size):
data = [self.count]*(self.width*self.height*3)
if self.count < self.expected:
self.count += 1
return Buffer() if self.count == self.expected else Buffer(data)
p = MyBufferProvider(320, 240)
# playback a mp4 video
Flow(Pipeline("playback")).inject([p]).render()()
Retrieve#
The ‘retrieve’ method appends a data retriever to the current flow. The operation takes a instance of BufferRetriever which implements the ‘consume’ method to access buffers. Invocation of the method result in the end of a flow and no more operation can be appended.
# Read the decoded video buffers from a sample mp4 file
from pyservicemaker import Pipeline, Flow, BufferRetriever
class MyBufferRetriever(BufferRetriever):
def __init__(self):
super().__init__()
self.frames = 0
def consume(self, buffer):
tensor = buffer.extract(0)
assert len(tensor.shape) == 3
self.frames += 1
return 1
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("retrieve")).capture([video_file]).retrieve(MyBufferRetriever())()
Decode#
The ‘decode’ method appends a decoding operation to the current flow. The operation adds a decoder to each upstream. It is very useful in the case that the data is injected via buffer providers.
class JpegBufferProvider(BufferProvider):
def __init__(self, file_path:str):
super().__init__()
self._file_path = file_path
self.format = "JPEG"
self.width = 1280
self.height = 720
self.framerate = 0
self.count = 0
self.expected = 255
def generate(self, size):
data = []
with open(self._file_path, "rb") as f:
bytes = f.read()
data = [int(b) for b in bytes]
if self.count < self.expected:
self.count += 1
return Buffer() if self.count == self.expected else Buffer(data)
# decode jpeg from a binary buffer
jpeg_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.jpg"
Flow(Pipeline("test")).inject([JpegBufferProvider(jpeg_file)]).decode().render()()
Batch#
The ‘batch’ method appends a batching operation to the current flow to combine all the streams to a single batched one. This operation takes standard keyword arguments including’batch_size’, ‘width’, and ‘height’ as parameters. If these parameters are not given, the operation sets the batch size to the number of streams and the width x height to 1920 x 1080 by default.
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback all the source in a tiled display
Flow(Pipeline("playback")).capture(uri_list).batch().render()()
Batched Capture#
The ‘batch_capture’ method appends a operation to capture to an empty flow and batches the inputs. The operation takes either a list of URIs or a source config file as input and forming a batched stream thereafter.
argument name |
description |
---|---|
input |
list of input URIs |
smart_record_config |
optional configuration file for enabling smart record |
kwargs |
optional keyword arguments for overriding the standard Deepstream Element properties |
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# playback 4 mp4 videos at the same time
Flow(Pipeline("playback")).batch_capture(uri_list).render()()
Render#
The ‘render’ method appends a renderer to the current flow to display or discard the video. The operation takes a render mode to decide how to render the data, DISPLAY (default) and DISCARD are supported so far. Moreover, optional named arguments cover all the standard sink control parameters. Invocation of the method result in the end of a flow and no more operation can be appended.
argument name |
description |
---|---|
mode |
render mode: DISPLAY (default), DISCARD, or STREAM |
enable_osd |
boolean value to enable OSD, True by default |
rtsp_mount_point |
the mount point for the rtsp stream, default is ‘ds-test’ and only valid when mode is STREAM |
rtsp_port |
the port number for the rtsp stream, default is 8554 and only valid when mode is STREAM |
kwargs |
optional keyword arguments for overriding the standard Deepstream Element properties |
# discard the video frames
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(Pipeline("playback")).capture([video_file]).render(mode=RenderMode.DISCARD)()
Encode#
The ‘encode’ method appends a encoder to the current flow to encode the video data into a file or rtsp stream. The operations takes a destination URI prefixed with ‘file://’ or ‘rtsp://’. If the prefix is missing, ‘file://’ is implied. In the case of RTSP stream, a port number must appear in the URI. Moreover, optional parameters for encoding control are supported too.
name |
description |
---|---|
dest |
|
use_sw_codec |
boolean value to indicate if the software codec is used, False by default |
profile |
profile: 0 for baseline (default), 1 for constrainted baseline, 2 for main, 4 for high |
iframeinterval |
Encoding Intra Frame occurrence frequency, default 10 |
bitrate |
bitrate, default 2000000 |
# streaming a udp stream via rtsp
pipeline = Pipeline("test")
video_file = "/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"
Flow(pipeline).capture([video_file]).encode("output.mp4", sync=True)()
Infer#
The ‘infer’ method enables the inference in the current flow. The operation takes a ‘config’ parameter for the model configuration file. Optional standard nvinfer parameters can be added to override the values in the configuration file.
argument name |
description |
---|---|
config |
configuration file for nvinfer |
with_triton |
boolean value to indicate if the inference is through triton, False by default |
kwargs |
optional keyword arguments for overriding the standard Deepstream Element properties |
pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
# object detection using resnet18 for 4 streams
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
Flow(Pipeline("infer")).batch_capture(uri_list).infer(pgie_config).render()()
Track#
The ‘track’ method appends a tracker to the current flow for tracking the detected object. The operation must come after ‘infer’ for detection data. Standard nvtracker parameters must be appropriately set to make the tracker work correctly.
pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
# object detection and tracking using nvmultiobjecttracker for 4 streams
Flow(Pipeline("tracker")).batch_capture(uri_list).infer(pgie_config).track(
ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so"
).render()()
Publish#
The ‘publish’ method appends a procedure to the current flow for publishing events to the remote server. The operations takes the following parameters to set up the communication between the pipeline and the remote server.
argument name |
description |
---|---|
msg_broker_proto_lib |
The low level library used by the message broker |
msg_broker_conn_str |
The connect string for the server |
topic |
topic name |
msg_conv_config |
The message converter config for source information |
# publish the object data to a kafka server
Flow(Pipeline("publish")).batch_capture(
"/opt/nvidia/deepstream/deepstream/service-maker/sources/apps/deepstream_test5_app/source_list_dynamic.yaml"
).infer(
"/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).attach(
what="add_message_meta_probe",
name="message_generator"
).publish(
msg_broker_proto_lib="/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so",
msg_broker_conn_str="qvs-ds-kafka.nvidia.com;9092",
topic="test4app",
)()
Invocation of the method result in the end of a flow and no more operation can be appended.
Preprocess#
The ‘preprocess’ method appends a preprocess operation to the current flow. The operation takes a ‘config’ parameter for the preprocess configuration file. An optional tensor generator callback can be provided to generate custom tensors alongside the preprocessed images.
argument name |
description |
---|---|
config_file |
The preprocess configuration file in YAML format |
tensor_generator |
The callback for generating custom tensors, which takes an integer (number of frames in the batch) as input and returns a dictionary of tensors |
preprocess_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_preprocess.yml"
Flow(Pipeline("preprocess")).batch_capture(
["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]
).preprocess(preprocess_config).infer(
"/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
).render()()
Attach#
The ‘attach’ method attach a Probe to the current flow. Two parameters required:
argument name |
description |
---|---|
what |
can be a Probe object or name of the probe in a shared library |
name |
the instance name if probe is from shared library |
Fork#
The ‘fork’ method forks the current flow so that more than one flow can be appended.
# Initiate a pipeline to read a mp4 file
# transcode the video to both a local file and via rtsp
# at the same time, do the playback
pipeline = Pipeline("test")
dest = "rtsp://localhost:8554"
flow = Flow(pipeline).capture(["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h265.mp4"]).batch().fork()
flow.encode(dest, sync=True)
flow.encode("/tmp/sample.mp4")
flow.render(sync=True)
flow()
The output RTSP stream can be received from “rtsp://localhost:8554/ds-test”
Analyze#
The ‘analyze’ method appends an analytics operation to the current flow, which outputs frame level and tracked object level information on Region of Interest (RoI) Filtering, Overcrowding Detection, Direction Detection, and Line Crossing. The operation must come after ‘infer’ and ‘track’. The operation takes a ‘config’ parameter for the NvDsAnalytics configuration file.
pgie_config = "/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_infer_primary.yml"
uri_list = ["/opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"]*4
pipeline = Pipeline("analytics")
flow = Flow(pipeline).batch_capture(uri_list).infer(pgie_config)
flow = flow.track(ll_config_file="/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml", ll_lib_file="/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so")
# Analyze the video frames and detected objects
flow = flow.analyze("/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-nvdsanalytics-test/config_nvdsanalytics.txt")
# Attach a probe to retrieve analytics info and render
flow.attach(what=Probe("counter", ObjectCounterMarker())).render()()
flow()
FlowAPI Sample Applications Reference Table#
Reference test application
Path inside service-maker/sources directory
Description
Platform
Sample test application 1
apps/python/flow_api/deepstream_test1_app
Sample of how to use flowAPI methods for a single H.264 stream inference: batch_capture -> infer -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection.
x86/aarch64
Sample test application 2
apps/python/flow_api/deepstream_test2_app
Sample of how to use flowAPI methods for a single H.264 stream cascaded inference: batch_capture -> infer (primary detector) -> track -> infer (secondary classifier) -> render. This app uses resnet18_trafficcamnet_pruned.onnx for detection and 2 classifier models (i.e., resnet18_vehiclemakenet_pruned.onnx, resnet18_vehicletypenet_pruned.onnx).
x86/aarch64
Sample test application 3
apps/python/flow_api/deepstream_test3_app
Builds on flow_api/deepstream_test1(sample test application 1) to demonstrate how to:
Use multiple sources in the pipeline for inference.
Extract the stream metadata, which contains useful information about the frames in the batched buffer.
This app uses resnet18_trafficcamnet_pruned.onnx for detection.
x86/aarch64
Sample test application 4
apps/python/flow_api/deepstream_test4_app
Builds on flow_api/deepstream_test1 for a single H.264 stream inference to demonstrate how to use publish method to publish messages to a remote server and fork method to simultaneously render the output. This app uses resnet18_trafficcamnet_pruned.onnx for detection.
x86/aarch64
Sample test application 5
apps/python/flow_api/deepstream_test5_app
Comprehensive DeepStream pipeline demonstrating Flow API capabilities with multi-stage inference and analytics. Features:
Primary object detection using resnet18_trafficcamnet_pruned.onnx
Object tracking with NvDCF tracker
Vehicle type classification (car, truck, bus, etc.) using resnet18_vehicletypenet_pruned.onnx
Vehicle make/model classification using resnet18_vehiclemakenet_pruned.onnx
Smart recording with event-based video capture and Kafka integration
Message publishing to Kafka with custom metadata
Performance monitoring probes (FPS, latency, OSD object counting)
KITTI format data export
RTSP streaming support
Multi-stream batch processing with configurable batch sizes
This app showcases advanced pipeline orchestration with parallel processing (fork) for simultaneous rendering and message publishing.
x86/aarch64
3D Action Recognition Example
apps/python/flow_api/deepstream_3d_action_recognition_app
Demonstrates 3D action recognition using Flow API with custom preprocessing. Features:
3D and 2D RGB-based action recognition using TAO models
Custom sequence preprocessing with libnvds_custom_sequence_preprocess.so
Support for action classes: push, fall_floor, walk, run, ride_bike
Batch capture, preprocessing, inference, and rendering pipeline
Custom metadata handling for label display with OSD
This app uses resnet18_3d_rgb_hmdb5_32.onnx and resnet18_2d_rgb_hmdb5_32.onnx models from NGC.
x86/aarch64
Smart Record Example
apps/python/flow_api/deepstream_sr_test_app
Builds on flow_api/deepstream_test1 to demonstrate event-driven smart recording with Kafka integration. Features multi-stream batch processing, inference, and configurable recording parameters. Supports RTSP streams with Kafka message broker for automated recording based on external triggers and custom events.
x86/aarch64
Inject and Retrieve Example
apps/python/flow_api/deepstream_appsrc_test_app
Demonstrates how to create a BufferRetriever for a retrieve method. The retrieve method with a customized BufferRetriever can be used to extract buffer data from the pipeline.
x86