Gst-nvmsgbroker

This plugin sends payload messages to the server using a specified communication protocol. It accepts any buffer that has NvDsPayload metadata attached and uses the nvds_msgapi_* interface to send the messages to the server. You must implement the nvds_msgapi_* interface for the protocol to be used and specify the implementing library in the proto-lib property.

Gst-nvmsgbroker

Inputs and Outputs

  • Inputs

    • Gst Buffer with NvDsPayload

  • Control parameters

    • Config

    • conn-str

    • proto-lib

    • comp-id

    • topic

    • new-api

  • Output

    • None, as this is a sink component

Features

The following table summarizes the features of the Gst-nvmsgbroker plugin.

Gst-nvmsgbroker plugin features

Feature

Description

Release

Payload in JSON format

Accepts message payload in JSON format

DS 3.0

Kafka protocol support

Kafka protocol adapter implementation

DS 3.0

Azure IOT support

Integration with Azure IOT framework

DS 4.0

AMQP support

AMQP 0-9-1 protocol adapter implementation

DS 4.0

REDIS support

Redis protocol adapter using Redis Streams

DS 5.1

Custom protocol support

Provision to support custom protocol through a custom implementation of the adapter interface

DS 3.0

Configurable parameters

Protocol specific options through configuration file

DS 3.0

Gst Properties

The following table describes the Gst properties of the Gst-nvmsgbroker plugin.

Gst-nvmsgbroker plugin gst properties

Property

Meaning

Type and Range

Example Notes

Platforms

config

Absolute pathname of configuration file required by nvds_msgapi_* interface

String

config=<msgapi_­config.txt>

dGPU Jetson

conn-str

Connection string as end point for communication with server

String Format must be <name>;<port>;<specifier>

conn-str= foo.bar.com;80 ;user-id

dGPU Jetson

proto-lib

Absolute pathname of library that contains the protocol adapter as an implementation of nvds_msgapi_*

String

proto-lib=<libnvds_kafka_proto.so>

dGPU Jetson

comp-id

ID of component from which metadata should be processed

Integer, 0 to 4,294,967,295

comp-id=3 Default: plugin processes metadata from any component

dGPU Jetson

topic

Message topic name

String

topic=dsapp1

dGPU Jetson

subscribe-topic-list

Topic names to subscribe for consuming messages

String

subscribe-topic-list=topic1; topic2; topic3

dGPU Jetson

new-api

To use protocol adapter library apis directly or use new msgbroker library wrapper apis

Integer

0 : Use adapter api’s directly

1 : msgbroker lib wrapper api’s (Refer to nv_msgbroker: Message Broker interface)

new-api = 0

dGPU Jetson

nvds_msgapi: Protocol Adapter Interface

You can use the DeepStream messaging interface, nvds_msgapi, to implement a custom protocol message handler and integrate it with DeepStream applications. Such a message handler, known as a protocol adapter, enables you to integrate DeepStream applications with backend data sources, such as data stored in the cloud.

Gst-nvmsgbroker-nvds_msgapi

The Gst-nvmsgbroker plugin calls the functions in your protocol adapter as shown in the figure above. These functions support:

  • Creating a connection

  • Sending messages by synchronous or asynchronous means

  • Terminating the connection

  • Coordinating the client’s and protocol adapter’s use of CPU resources and threads

  • Getting the protocol adapter’s version number

The nvds_msgapi interface is defined in the header file source/includes/nvds_msgapi.h. This header file defines a set of function pointers which provide an interface analogous to an interface in C++. The following sections describe the methods defined by the nvds_msgapi interface.

nvds_msgapi_connect(): Create a Connection

NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str,
nvds_msgapi_connect_cb_t connect_cb, char *config_path
);

The function accepts a connection string and configures a connection. The adapter implementation can choose whether the function makes a connection to accommodate connectionless protocols such as HTTP.

Parameters

  • connection_str: A pointer to a string that specifies connection parameters in the general format <url>;<port>;<specifier>.

    • <url> and <port> specify the network address of the remote entity.

    • <specifier> specifies information specific to a protocol. Its content depends on the protocol’s implementation. It may be, for example, a client identifier for making the connection.

    Note that this connection string format is not binding, and an adapter may omit some fields (e.g.: specifier) from its format, provided the omission is described in its documentation. A special case of such connection string adaptation is where the adapter expects all connection parameters to be specified as fields in the configuration file (see config path below), in which case the connection string is passed as NULL.

  • connect_cb: A callback function for events associated with the connection.

  • config_path: The pathname of a configuration file that defines protocol parameters used by the adapter.

Return Value

A handle for use in subsequent interface calls if successful, or NULL otherwise.

nvds_msgapi_send() and nvds_msgapi_send_async(): Send an event

NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle *h_ptr,
char *topic, uint8_t *payload, size_t nbuf
);
NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr,
char *topic, const uint8_t *payload, size_t nbuf,
nvds_msgapi_send_cb_t send_callback, void *user_ptr
 );

Both functions send data to the endpoint of a connection. They accept a message topic and a message payload. The nvds_send() function is synchronous. The nvds_msgapi_send_async() function is asynchronous; it accepts a callback function that is called when the “send” operation is completed. Both functions allow the API client to control execution of the adapter logic by calling nvds_msgapi_do_work(). See the description of the nvds_msgapi_do_work() function.

Parameters

  • h_ptr: A handle for the connection, obtained by a call to nvds_msgapi_connect().

  • topic: A pointer to a string that specifies a topic for the message; may be NULL if topic is not meaningful for the semantics of the protocol adapter.

  • payload: A pointer to a byte array that contains the payload for the message.

  • nbuf: Number of bytes to be sent.

  • send_callback: A pointer to a callback function that the asynchronous function calls when the “send” operation is complete. The signature of the callback function is of type nvds_msgapi_send_cb_t, defined as:

    typedef void (*nvds_msgapi_send_cb_t)(void *user_ptr,
    NvDsMsgApiErrorType completion_flag
    );
    

where the callback’s parameters are:

  • user_ptr: The user pointer (user_ptr) from the call to nvds_msgapi_send() or`` nvds_msgapi_send_async()`` that initiated the “send” operation. Enables the callback function to identify the initiating call.

  • completion_flag: A code that indicates the completion status of the asynchronous send operation.

nvds_msgapi_subscribe(): Consume data by subscribing to topics

NvDsMsgApiErrorType nvds_msgapi_subscribe (NvDsMsgApiHandle h_ptr, char ** topics, int num_topics, nvds_msgapi_subscribe_request_cb_t  cb, void *user_ctx);

This API is used to subscribe to topic(s) and consume messages from the external entity. The API is asynchronous and must be called with an already created valid Kafka connection handle as parameter. The caller must also provide a pointer to the callback function to receive the consumed messages from the connection endpoint and an optional user_ctx pointer for specifying user context

Parameters

  • h_ptr: A handle for the connection, obtained by a call to nvds_msgapi_connect()

  • topics: A 2d pointer which points to a char array of topic names

  • num_topics: num of topics to subscribe

  • cb: A pointer to a callback function to get notified of the consumed messages on subscribed topic(s)

  • user_ctx: user ptr to be passed to callback for context

The pointer to a callback function which is specified as a param in subscribe API is of type nvds_msgapi_subscribe_request_cb_t defined as:

    typedef void (*nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr);

where the callback’s parameters are:

* ``flag``: To specify the error status of message consumed
* ``msg``: Consumed message / payload
* ``msg_len``: Length of message in bytes
* ``topic``: Topic name where the message was received
* ``user_ptr``: pointer passed during subscribe() for context

nvds_msgapi_do_work(): Incremental Execution of Adapter Logic

void nvds_msgapi_do_work();

The protocol adapter must periodically surrender control to the client during processing of nvds_msgapi_send() and nvds_msgapi_send_async() calls. The client must periodically call nvsd_msgapi_do_work() to let the protocol adapter resume execution. This ensures that the protocol adapter receives enough CPU resources. The client can use this convention to control the protocol adapter’s use of multi-threading and thread scheduling. The protocol adapter can use it to support heartbeat functionality, if the underlying protocol requires that.

The nvds_msgapi_do_work() convention is needed when the protocol adapter executes in the client thread. Alternatively, the protocol adapter may execute time-consuming operations in its own thread. In this case the protocol adapter need not surrender control to the client, the client need not call nvsd_msgapi_do_work(), and the implementation of nvds_msgapi_do_work() may be a no-op. The protocol adapter’s documentation must specify whether the client must call nvds_msgapi_do_work(), and if so, how often.

nvds_msgapi_disconnect(): Terminate a Connection

NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr);

The function terminates the connection, if the underlying protocol requires it, and frees resources associated with h_ptr.

Parameters

  • h_ptr: A handle for the connection, obtained by a call to nvds_msgapi_connect().

nvds_msgapi_getversion(): Get Version Number

char *nvds_msgapi_getversion();

This function returns a string that identifies the nvds_msgapi version supported by this protocol adapter implementation. The string must use the format <major>.<minor>, where <major> is a major version number and <minor> is a minor version number. A change in the major version number indicates an API change that may cause incompatibility. When the major version number changes, the minor version number is reset to 1.

nvds_msgapi_get_protocol_name(): Get name of the protocol

char *nvds_msgapi_get_protocol_name(void);

This function returns a string that identifies the underlying protocol used in the adapter library. for example, "KAFKA”, “AMQP”, “AZURE_DEVICE_CLIENT” , “AZURE_MODULE_CLIENT”

nvds_msgapi_connection_signature(): Get Connection signature

NvDsMsgApiErrorType nvds_msgapi_connection_signature(char *broker_str, char *cfg, char *output_str, int max_len);

This function returns a string that uniquely identifies the signature of the connection parameters passed to the adapter library for making a connection. On success, a connection signature string(generated by SHA256) is returned. Upon error or invalid connection parameters, empty string “” is returned.

Parameters:

  • broker_str: Broker connection string used to create connection

  • cfg: Path to config file

  • output_str: Output Connection signature

  • max_len : max length of output_str

nvds_kafka_proto: Kafka Protocol Adapter

The DeepStream 5.0 release includes a protocol adapter that supports Apache Kafka. The adapter provides out-of-the-box capability for DeepStream applications to publish messages to Kafka brokers.

Installing Dependencies

The Kafka adapter uses librdkafka for the underlying protocol implementation. This library must be installed prior to use. To install librdkakfa, enter these commands:

git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
git reset --hard 063a9ae7a65cebdf1cc128da9815c05f91a2a996
./configure
make
sudo make install
sudo cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream-5.0/lib

Install additional dependencies:

sudo apt-get install libglib2.0 libglib2.0-dev
sudo apt-get install libjansson4 libjansson-dev

Using the Adapter

You can use the Kafka adapter in an application by setting the Gst-nvmsgbroker plugin’s proto-lib property to the pathname of the adapter’s shared library, libnvds_kafka_proto.so. The plugin’s conn-str property must be set to a string with format:

<kafka broker address>;<port>

This instantiates the Gst-nvmsgbroker plugin and makes it use the Kafka protocol adapter to publish messages that the application sends to the broker at the specified broker address and topic.

Configuring Protocol Settings

You can define configuration setting for the Kafka protocol adapter as described by the documentation at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. You can set these options in the Gst-nvmsgbroker configuration file. Like the rest of DeepStream, the configuration file uses the gkey format. The Kafka settings must be in a group named [message-broker] and must be specified as part of a key named proto-cfg. The settings can be a series of key-value pairs separated by semicolons. For example:

[message-broker]
proto-cfg="message.timeout.ms=2000;retries=5"
consumer-group-id = groupid
partition-key = keyid

The Kafka adapter lets you specify the name of the field in messages that is to be used to define the partition key. For each message, the specified message field is extracted and send to the topic partitioner along with the message. The partitioner uses it to identify the partition in the Kafka cluster that handles the message. The partition key information must be specified in the Gst-nvmsgbroker configuration file’s [message-broker] group, using an entry named partition-key. Fields embedded in a hierarchy of JSON objects in the message are specified using dotted notation. For example, for the sample JSON message shown below, the id field in the sensor object is identified as sensor.id

{
 "sensor" {
 "id": "cam1"
 }
}

Additionally, the Kafka adapter lets you specify the consumer group id. The consumer group is a string that uniquely identifies the group of consumer processes to which this Kafka consumer belongs. The key name consumer-group-id can be specified in the Gst-nvmsgbroker configuration file’s [message-broker] group. If this field is unspecified, the default consumer group name “test-consumer-group” will be used.

Note

For the DeepStream reference application and the 360 D application, both distributed with the DeepStream SDK, you can add the proto-cfg setting to the [message-broker] group of the top-level configuration file passed to the application.

Programmatic Integration

You can integrate the Kafka adapter into custom user code by using the nvds_msgapi interface to call its functions. Note the following points regarding the functions defined by the interface:

  • The connection string passed to the nvdm_msgapi_connect() has the format <kafka broker address>;<port>.

  • For both “send” functions, the topic name must be passed as param to nvds_msgapi_send(). or nvds_msgapi_send_async()

  • For the subscribe API, a 2D pointer to a char array of topic-names must be passed as param. Also, a pointer to a user callback function must be provided. When there’s a new message from the remote entity, the Kafka consumer will forward the message to the application by calling the user callback function.

  • The application must call nvds_msgapi_do_work() at least once a second, and preferably more often. The frequency of calls to nvds_msgapi_do_work() determines the rate at which messages waiting to be sent are processed.

  • It is safe for multiple application threads to share connection handles. The library librdkafka is thread-safe, so Kafka protocol adapter does not need to implement separate locking mechanisms for functions calling directly to this library.

  • The Kafka protocol adapter expects the client to manage usage and retirement of the connection handle. The client must ensure that once a handle is disconnected, it is not used for either a “send” call or a call to nvds_msgapi_do_work(). While the library attempts to ensure graceful failure if the application calls these functions with retired handles, it does not do so in a thread-safe manner.

Security for Kafka

To learn more about security for Kafka, see the Secure Edge-to-Cloud Messaging section in the NVIDIA DeepStream SDK Developer Guide 6.0 Release. Monitor Adapter Execution The Kafka adapter generates log messages based on the nvds_logger framework to help you monitor execution. The adapter generates separate logs for the INFO, DEBUG, and ERROR severity levels, as described in nvds_logger: Logging Framework. You can limit the log messages generated by setting the level at which log messages are filtered as part of the logging setup script.

Note

If the severity level is set to DEBUG, the nvds_logger framework logs the entire contents of each message sent by the Kafka protocol adapter.

Azure MQTT Protocol Adapter Libraries

The DeepStream 5.0 release includes protocol adapters that supports direct messaging from device to cloud (using the Azure device client adapter) and through Azure IoT Edge runtime (using the Azure module client adapter). The adapters provide out-of-the-box capability for DeepStream applications to publish messages to Azure IoT Hub using the MQTT protocol. The Azure IoT protocol adapters are encapsulated by their respective shared libraries found within the DeepStream package at /opt/nvidia/deepstream/deepstream-5.0/lib. The Azure device client adapter library is named libnvds_azure_proto.so. The Azure module client adapter library is named libnvds_azure_edge_proto.so.

Installing Dependencies

Azure adapters use libiothub_client.so from the Azure IoT C SDK (v1.2.8) for the underlying protocol implementation. After you install the DeepStream package you can find the precompiled library at: /opt/nvidia/deepstream/deepstream-5.0/lib/libiothub_client.so. You can also compile libiothub_client.so manually by entering these commands:

git clone -b 2018-07-11 --recursive https://github.com/Azure/azure-iot-sdk-c.git
cd azure-iot-sdk-c
mkdir cmake
cd cmake
cmake -Dbuild_as_dynamic:BOOL=ON -Duse_edge_modules:BOOL=ON ..
cmake --build .  # append '-- -j <n>' to run <n> jobs in parallel

To install some other required dependencies, enter one of these commands.

  • For an x86 computer using Ubuntu 18.04:

    sudo apt-get install -y libcurl3 libssl-dev uuid-dev libglib2.0 libglib2.0-dev
    
  • For other platforms or OS:

    sudo apt-get install -y libcurl4-openssl-dev libssl-dev uuid-dev libglib2.0 libglib2.0-dev
    

Setting Up Azure IoT

Azure IoT adapter needs a functioning Azure IoT Hub instance to which is can publish messages. To set up an Azure IoT Hub instance if required, see the instructions at: https://docs.microsoft.com/en-us/azure/iot-hub/tutorial-connectivity. After you create the Azure IoT instance, create a device entry corresponding to the device that is running DeepStream.

To set up Azure IoT Edge runtime on the edge device, see the instructions at https://docs.microsoft.com/en-us/azure/iot-edge/how-to-install-iot-edge-linux.

Configuring Adapter Settings

Place Azure IoT specific information in a custom configuration file named, e.g., cfg_azure.txt. The entries in the configuration file vary slightly between the Azure device client and the module client.

  • For an Azure device client:

    [message-broker]
    connection_str = HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;
    custom_msg_properties =  <key1>=<value1>; <key2>=<value2>; <key3>=<value3>;
    
  • For an Azure module client:

    [message-broker]
    #custom_msg_properties =  <key1>=<value1>; <key2>=<value2>; <key3>=<value3>;
    

Here is useful information about some of the configuration file properties:

  • connection_str: You can obtain the Azure connection string from the Azure IoT Hub web interface. A connection string uniquely identifies each device associated with the IoT Hub instance. It is under the “Primary Connection String” entry in the “Device detail” section.

  • custom_msg_properties: Use this property to embed custom key/value pairs in the MQTT messages sent from the device to Azure IoT. You can embed multiple key values separated by semicolons, as in this example:

    custom_msg_properties = ex2: key1=value1;key2=value2;key3=value3;
    

Note

The connection_str and custom_msg_properties strings are each limited to 512 characters.

Using the Adapter

To use the Azure device client adapter in an application, set the Gst-nvmsgbroker plugin’s proto-lib property to the pathname of the adapter’s shared library - libnvds_azure_proto.so for the device client case, or libnvds_azure_edge_proto.so for the module client case. The next step in using the adapter is to specify the connection details. The procedure for specifying connection details is different for the Azure device client and module client cases, as described in the following sections.

Connection Details for the Device Client Adapter

Set the plugin’s conn-str property to the full Azure connection string in the format:

HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;SharedAccessKey=<my-policy-key>

Alternatively, you can specify the connection string details in the Azure configuration file:

[message-broker]
connection_str = HostName=<my-hub>.azure-devices.net;DeviceId=<device_id>;SharedAccessKey=<my-policy-key>

Connection Details for the Module Client Adapter

Leave the connection string empty, since the Azure IoT Edge library automatically fetches the connection string from the file /etc/iotedge/config.yaml. Once the connection details have been configured, you can integrate the Azure device client and module client adapters into custom user code by using the nvds_msgapi interface to call its functions. Note the following points about the functions defined by the interface:

  • The connection string passed to nvds_msgapi_connect() may be NULL for both the Azure device client and the module client. For the device client the Azure configuration file has an option to specify a connection string. For the module client the connection string is always specified in /etc/iotedge/config.yaml.

  • Both “send” functions use the topic name specified in the Gst-nvmsgbroker plugin’s property “topic.” It may be null.

  • The application must call nvds_msgapi_do_work() after each call to nvds_msgapi_send_async(). The frequency of calls to nvds_msgapi_do_work() determines the rate at which messages waiting to be sent are processed.

  • It is safe for multiple application threads to share connection handles. The library libiothubclient is thread-safe, so Azure protocol adapters need not implement separate locking mechanisms for functions calling this library directly.

  • The Azure protocol adapters expects the client to manage usage and retirement of the connection handle. The client must ensure that once a handle is disconnected, it is not used for either a “send” call or a call to nvds_msgapi_do_work(). While the library attempts to ensure graceful failure if the application calls these functions with retired handles, it does not do so in a thread-safe manner.

Monitor Adapter Execution

The Azure device client and module client use different logging mechanisms.

Azure device client library log messages

The Azure device client adapter uses the nvds_logger framework to generate log messages which can help you monitor execution. The adapter generates separate logs for the INFO, DEBUG, and ERROR severity levels, as described in nvds_logger: Logging Framework. You can limit the generated log messages by setting the level at which log messages are filtered in the logging setup script.

Note

If the severity level is set to DEBUG, the nvds_logger framework logs the entire contents of each message sent by the Azure device client protocol adapter.

Azure Module Client Library Log Messages

The log messages from the Azure module client adapter library are emitted to stdout, and the log output is captured in the docker/iotedge module logs.

Message Topics and Routes

You can specify a message topic in a GStreamer property topic. However, the Azure device client and module client use the topic property in different ways. The Azure device client does not support topics. Thus, the value of the topic property is ignored, and you cannot use it to filter messages on Azure IoT Hub. The Azure module client uses the topic property to determine the route of messages, i.e. how messages are passed within a system. For more information about message routes, see: https://docs.microsoft.com/en-us/azure/iot-edge/module-composition#declare-routes)

AMQP Protocol Adapter

DeepStream release 5.0 includes an AMQP protocol adapter that DeepStream applications can use out of the box to publish messages using AMQP 0-9-1 message protocol. The AMQP protocol adapter shared library is in the deepstream package at: /opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_amqp_proto.so

Installing Dependencies

AMQP protocol adapter for DeepStream uses the librabbitmq.so library, built from rabbitmq-c (v0.8.0) for the underlying AMQP protocol implementation. To build the library, enter these commands:

it clone -b v0.8.0  --recursive https://github.com/alanxz/rabbitmq-c.git
mkdir build && cd build
cmake ..
cmake --build .

To copy the built librabbitmq.so library to its final location, enter the following commands.

  • For x86:

    sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/
    
  • For Jetson:

    sudo cp ./librabbitmq/librabbitmq.so.4 /usr/lib/aarch64-linux-gnu/
    

Install additional dependencies:

sudo apt-get install libglib2.0 libglib2.0-dev

AMQP broker

The AMQP protocol communicates with an AMQP 0-9-1 compliant message broker. If you do not have a functioning broker already, you can deploy one by installing the rabbitmq-server package, available at: https://www.rabbitmq.com/install-debian.html You can install this package on your local system or on the remote machine where you want the broker to be installed. To install the package, enter the command:

sudo apt-get install rabbitmq-server

To determine whether the rabbitmq service is running, enter the command:

sudo service rabbitmq-server status

If rabbitmq is not running, enter this command to start it:

sudo service rabbitmq-server start

Configure Adapter Settings

You can place AMQP protocol adapter specific information in a custom configuration named, for example, cfg_amqp.txt. Here is an example of configuration file entries for an AMQP broker installed on the local machine:

[message-broker]
hostname = localhost
username = guest
password = guest
port = 5672
exchange = amq.topic
topic = topicname

The properties in the configuration file are:

  • hostname: Hostname of the host on which the AMQP broker is installed

  • username: Username used to log in to the broker

  • password: Password used to log in to the broker

  • port: Port used to communicate with the AMQP broker

  • exchange: Name of the exchange on which to publish messages

  • topic: Message topic

Using the Adapter

To use the AMQP protocol client adapter in a DeepStream application, set the Gst-nvmsgbroker plugin’s proto-lib property to the pathname of the adapter’s shared library, libnvds_amqp_proto.so.:

proto-lib = <path to libnvds_amqp_proto.so>

You can specify the AMQP connection details in the AMQP adapter specific configuration file (e.g., cfg_amqp.txt) as described above. This is the recommended method. The path to the AMQP configuration file is specified by the Gst property config:

config = <path to cfg_amqp.txt>

Alternatively, you can specify the AMQP protocol’s hostname, port number, and username in the Gst plugin’s conn-str property, and specify the password in the configuration file. In the Gst properties:

conn-str = hostname;5672;username
config   = <pathname of AMQP configuration file>

In the AMPQ configuration file:

[message-broker]
password = <password>

You can set the Gst-nvmsgbroker plugin’s topic property to specify the message topic.

topic =  <topicname>

Alternatively, you can specify a topic in the AMQP configuration file (cfg_amqp.txt). In the Gst properties, set:

config    = <path to cfg_amqp.txt>

In the AMQP configuration file:

[message-broker]
Topic = topicname

Programmatic Integration

Once you have configured the connection, you can integrate the AMQP protocol adapter into your application by using the nvds_msgapi interface to call its functions. Note the following points about the functions defined by the interface:

  • The connection string passed to nvds_msgapi_connect() has the format`` Hostname;<port>;username``.

  • For both “send” functions, the topic name is specified either by the Gst-nvmsgbroker plugin’s topic property or by the topic parameter in the AMQP configuration file.

  • The application must call nvds_msgapi_do_work() after each call to nvds_msgapi_send_async(). The frequency of calls to nvds_msgapi_do_work() determines the rate at which messages waiting to be sent are processed.

The AMQP protocol adapter expects the client to manage usage and retirement of the connection handle. The client must ensure that once a handle is disconnected, it is not used for either a “send” call or a call to nvds_msgapi_do_work(). While the library attempts to ensure graceful failure, if the application calls these functions with retired handles, it does not do so in a thread-safe manner.

Note

As stated at https://github.com/alanxz/rabbitmq-c#threading, you cannot share a socket, an amqp_connection_state_t, or a channel between threads using the librabbitmq library. This library is designed for use by event-driven, single-threaded applications, and does not yet meet the requirements of threaded applications. To deal with this limitation, your application must open an AMQP connection (and an associated socket) per thread. If it needs to access a single AMQP connection or any of its channels from more than one thread, you must implement an appropriate locking mechanism. It is generally simpler to have a connection dedicated to each thread.

Monitor Adapter Execution

The AMQP protocol adapter uses the nvds_logger framework to generate log messages which can help you monitor execution. The adapter generates separate logs for the INFO, DEBUG, and ERROR severity levels, as described in nvds_logger: Logging Framework. You can limit the log messages being generated by setting the level at which log messages are filtered in the logging setup script.

Note

If the severity level is set to DEBUG, nvds_logger logs the entire contents of each message sent by the AMQP protocol adapter.

REDIS Protocol Adapter

DeepStream release 6.0 includes a REDIS protocol adapter that DeepStream applications can use out of the box to publish messages using REDIS streams https://redis.io/topics/streams-intro The REDIS protocol adapter shared library is in the deepstream package at: /opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_redis_proto.so

Installing Dependencies

REDIS protocol adapter for DeepStream uses the libhiredis.so library, built from Hiredis v1.0.0.

Build dependencies with installation instructions:

  • libhiredis:

    git clone https://github.com/redis/hiredis.git
    cd hiredis
    git reset --hard d5b4c69b7113213c1da3a0ccbfd1ee1b40443c7a
    make USE_SSL=1
    sudo cp libhiredis* /opt/nvidia/deepstream/deepstream/lib/
    sudo ln -sf /opt/nvidia/deepstream/deepstream/lib/libhiredis.so /opt/nvidia/deepstream/deepstream/lib/libhiredis.so.1.0.1-dev
    sudo ln -sf /opt/nvidia/deepstream/deepstream/lib/libhiredis_ssl.so /opt/nvidia/deepstream/deepstream/lib/libhiredis.so.1.0.1-dev-ssl
    sudo ldconfig
    
  • glib 2.0:

    apt-get install libglib2.0 libglib2.0-dev
    

REDIS server

  • Install & setup redis-server on your machine. Follow instructions here and download the redis version(6.0.8) : https://redis.io/download

    wget http://download.redis.io/releases/redis-6.0.8.tar.gz
    tar xzf redis-6.0.8.tar.gz
    cd redis-6.0.8
    make
    
  • Run the server

    src/redis-server &
    

Configure Adapter Settings

You can place REDIS protocol adapter specific information in a custom configuration file, for example, cfg_redis.txt. Here is an example of configuration file entries for a redis server installed on the local machine:

[message-broker]
hostname=localhost
port=6379
payloadkey=metadata
consumergroup=mygroup
consumername=myname
streamsize=10000

The properties in the configuration file are:

  • hostname: Hostname of the host on which the REDIS server is installed

  • port: Port used to communicate with the REDIS server

  • payloadkey: Specify the redis stream key for the payload (More info on streams key: https://redis.io/topics/streams-intro)

  • consumergroup: Specify redis streams consumer groupname (More info on redis streams consumer groups : https://redis.io/commands/xgroup)

  • consumername: Specify redis streams consumer name

  • streamsize: Specify the max stream size for the redis stream where data is being published

Using the Adapter

To use the REDIS protocol client adapter in a DeepStream application, set the Gst-nvmsgbroker plugin’s proto-lib property to the pathname of the adapter’s shared library, libnvds_redis_proto.so.

proto-lib = <path to libnvds_redis_proto.so>

You can specify the REDIS connection details in the REDIS adapter specific configuration file (e.g., cfg_redis.txt) as described above. This is the recommended method. The path to the REDIS configuration file is specified by the Gst property config:

config = <path to cfg_redis.txt>

Alternatively, you can specify the REDIS protocol’s hostname, port number in the Gst plugin’s conn-str property, and specify the password in the configuration file. In the Gst properties:

conn-str = hostname;6379
config   = <pathname of REDIS configuration file>

You can set the Gst-nvmsgbroker plugin’s topic property to specify the message topic.

topic =  <redis streamname>

Programmatic Integration

Once you have configured the connection, you can integrate the REDIS protocol adapter into your application by using the nvds_msgapi interface to call its functions. Note the following points about the functions defined by the interface:

  • The connection string passed to nvds_msgapi_connect() has the format`` Hostname;<port>``.

  • For both “send” functions, the topic name is specified either by the Gst-nvmsgbroker plugin’s topic property.

  • The application must call nvds_msgapi_do_work() after each call to nvds_msgapi_send_async(). The frequency of calls to nvds_msgapi_do_work() determines the rate at which messages waiting to be sent are processed.

The REDIS protocol adapter expects the client to manage usage and retirement of the connection handle. The client must ensure that once a handle is disconnected, it is not used for either a “send” call or a call to nvds_msgapi_do_work(). While the library attempts to ensure graceful failure, if the application calls these functions with retired handles, it does not do so in a thread-safe manner.

Monitor Adapter Execution

The REDIS protocol adapter uses the nvds_logger framework to generate log messages which can help you monitor execution. The adapter generates separate logs for the INFO, DEBUG, and ERROR severity levels, as described in nvds_logger: Logging Framework. You can limit the log messages being generated by setting the level at which log messages are filtered in the logging setup script.

Note

If the severity level is set to DEBUG, nvds_logger logs the entire contents of each message sent by the REDIS protocol adapter.

nv_msgbroker: Message Broker interface

Deepstream 5.0 features a new messagebroker library which can be used to make connections with multiple external brokers. This library acts as a wrapper around the message adapter libraries described in the above section and provides its own API’s Gst-msgbroker plugin has an option to directly call in to the adapter library API’s for connecting with external entity or use the nvmsgbroker library interface to have the ability to connect with multiple external entities at a time. Additionally, autoreconnect feature is introduced within nvmsgbroker library as of Deepstream6.0 wherein a periodic reconnection attempt is made to re-establish connection with the external entity.

As of Deepstream-6.2, a new configuration work-interval is available, which allows the user to set the wait time between execution of adapter logic.

Gst-nvmsgbroker-nv_msgbroker

The Gst-nvmsgbroker plugin can call the Api’s in the nvmsgbroker library as shown in the diagram above. The Api’s support:

  • Creating a connection

  • Sending messages by asynchronous means

  • Terminating the connection

  • Fetching the nvmsgbroker library version number

The nvmsgbroker interface is defined in the header file source/includes/nvmsgbroker.h. This header file defines a set of function pointers which provide an interface analogous to an interface in C++. The following sections describe the methods defined by the nvmsgbroker interface.

nv_msgbroker_connect(): Create a Connection

NvMsgBrokerClientHandle   nv_msgbroker_connect(char *broker_conn_str, char *broker_proto_lib, nv_msgbroker_connect_cb_t connect_cb, char *cfg);

The function accepts a connection string and configures a connection. The broker adapter proto lib implementation can choose whether the function makes a connection to accommodate connectionless protocols such as HTTP.

Parameters

  • broker_conn_str: A connection string with format specific for protocol adapter

  • broker_proto_lib: Full Path to Message protocol adapter library

  • connect_cb : A pointer to a callback function for events associated with the connection.

  • Cfg: Pathname of a configuration file passed to be passed to the protocol adapter

    Connect callback

         typedef void (*nv_msgbroker_connect_cb_t)(NvMsgBrokerClientHandle h_ptr, NvMsgBrokerErrorType status );
    
    Where the callback’s parameters are:
    
     * ``h_ptr`` : The connection handle to identify the initiating call.
     * ``status``: A code that indicates the status of the connection.
    

Return Value

A connection handle for use in subsequent interface calls if successful, or NULL otherwise.

nv_msgbroker_send_async(): Send an event asynchronously

NvMsgBrokerErrorType  nv_msgbroker_send_async (NvMsgBrokerClientHandle h_ptr, NvMsgBrokerClientMsg message, nv_msgbroker_send_cb_t cb, void *user_ctx);

The API sends data to the endpoint of a connection. It accepts a message topic and a message payload. Send is asynchronous; it accepts a callback function that is called when the “send” operation is completed.

Parameters

  • h_ptr: connection handle to Message Broker library

  • message: Message packet which has details of message payload, payload length , topic

  • cb: callback to be invoked to notify status of send

  • user_ctx: pointer to pass to callback for context

    Message Packet structure

    typedef struct {
    char *topic;
    void *payload;
    size_t payload_len;
    } NvMsgBrokerClientMsg;
    

    Send callback

         typedef void (*nv_msgbroker_send_cb_t)(void *user_ptr,  NvMsgBrokerErrorType flag);
    
    Where the callback’s parameters are:
    * ``user_ptr``: The user pointer (user_ptr) from the call to ``nv_msgbroker_send_async()`` that initiated the “send” operation. Enables the callback function to identify the initiating call.
    * ``flag``: A code that indicates the completion status of the send operation.
    

nv_msgbroker_subscribe(): Consume data by subscribing to topics

NvMsgBrokerErrorType nv_msgbroker_subscribe(NvMsgBrokerClientHandle h_ptr, char ** topics, int num_topics,  nv_msgbroker_subscribe_cb_t cb, void *user_ctx);

This API is used to subscribe to topic(s) and consume messages from the external entity. The API is asynchronous and must be called with an already created valid connection handle as parameter. The caller must also provide a pointer to the callback function to receive the consumed messages from the connection endpoint and an optional user_ctx pointer for specifying user context

Parameters

  • h_ptr: A handle for the connection, obtained by a call to nv_msgbroker_connect()

  • topics: A 2d pointer which points to a char array of topic names

  • num_topics: num of topics to subscribe

  • cb: A pointer to a callback function to get notified of the consumed messages on subscribed topic(s)

  • user_ctx: user ptr to be passed to callback for context

    The pointer to a callback function which is specified as a param in subscribe API is of type nv_msgbroker_subscribe_cb_t defined as:

    typedef void (*nv_msgbroker_subscribe_cb_t)(NvMsgBrokerErrorType flag, void *msg, int msglen, char *topic, void *user_ptr);
    

    Where the callback’s parameters are:

    • flag: To specify the error status of message consumed

    • msg: Consumed message / payload

    • msg_len: Length of message in bytes

    • topic: Topic name where the message was received

    • user_ptr: pointer passed during subscribe() for context

nv_msgbroker_disconnect(): Terminate a Connection

NvMsgBrokerErrorType nv_msgbroker_disconnect(NvMsgBrokerClientHandle h_ptr);

The function terminates the connection, if the underlying protocol requires it, and frees resources associated with h_ptr.

Parameters

  • h_ptr: A handle for the connection, obtained by a call to nv_msgbroker_connect()

nv_msgbroker_version(): Get Version Number

char *nv_msgbroker_version();

This function returns a string that identifies the nv_msgbroker version supported by this protocol adapter implementation. The string must use the format <major>.<minor>, where <major> is a major version number and <minor> is a minor version number. A change in the major version number indicates an API change that may cause incompatibility. When the major version number changes, the minor version number is reset to 1.

Autoreconnect feature

nvmsgbroker library features autoreconnect capability wherein after network connection with the endpoint is down, a periodic reconnect attempt is made with the external entity. configurations applicable for nvmsgbroker library to avail this feature are listed in cfg_nvmsgbroker.txt

(0):disable
(1):enable
auto-reconnect=1

#connection retry interval in seconds
retry-interval=<value>

#connection max retry limit in seconds
max-retry-limit=<value>

#interval at which to perform work, in microseconds
work-interval=10000

Work-interval configuration

The user can now set the interval in microseconds at which protocol adapter work is performed by changing the work-interval configuration in cfg_nvmsgbroker.txt. The default is 10000 microseconds, aka 10 ms.

...
#interval at which to perform work, in microseconds
work-interval=10000

nvds_logger: Logging Framework

DeepStream provides a logging framework named nvds_logger. The Kafka protocol adapter uses this framework to generate a run time log. nvds_logger is based on syslog, and offers many related features, including:

  • Choice of priorities (log levels)

  • Log filtering and redirection

  • Shared logging across different DeepStream instances running concurrently

  • Log retirement and management using logrotate

  • Cross-platform support

Enabling Logging

To enable logging, run the setup_nvds_logger.sh script. Note that this script must be run with sudo. You may have to modify the permissions associated with this script to make it executable. The script accepts an optional parameter specifying the pathname of log file to be written. By default, the pathname is /tmp/nvds/ds.log. Once logging is enabled, you can access the generated log messages by reading the log file. By default, you must have sudo permissions to read the log file. Standard techniques for syslog-based logging configuration can eliminate this requirement.

Filtering Logs

nvds_logger allows logs to be associate with a severity level similar to that which syslog offers. You can filter log messages based on severity level by modifying the setup script. By default, the script enables logging for messages at the INFO level (level 6) and above. You can modify this as outlined in the comments in the script: # Modify log severity level as required and rerun this script

#              0       Emergency: system is unusable
#              1       Alert: action must be taken immediately
#              2       Critical: critical conditions
#              3       Error: error conditions
#              4       Warning: warning conditions
#              5       Notice: normal but significant condition
#              6       Informational: informational messages
#              7       Debug: debug-level messages
# refer https://tools.ietf.org/html/rfc5424.html for more information

echo "if (\$msg contains 'DSLOG') and (\$syslogseverity <= 6) then $nvdslogfilepath" >> 11-nvds.conf

Retiring and Managing Logs

It is recommended that you limit the size of log files by retiring them periodically. logrotate is a popular utility for this purpose. You can use it in cron jobs so that the log files are automatically archived periodically and are discarded after a desired interval.

Generating Logs

You can implement modules that use the logger by including sources/includes/nvds_logger.h in the source code and linking to the libnvds_logger.so library. Generating logs programmatically involves three steps:

  1. Call nvds_log_open() before you write any log messages.

  2. Call nvds_log() to write log messages.

  3. Call nvds_log_close() upon completion to flush and close the logs.

Note the nvds_logger is a process-based logging mechanism, so the recommended procedure is to call nvds_log_open() from the main application routine rather than the individual plugins. Similarly, call nvds_log_close() from the main application when it shuts down the application before exit.