NVIDIA DeepStream SDK API Reference

9.0 Release
9.0/sources/includes/nvds_mqtt_proto.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  * SPDX-License-Identifier: LicenseRef-NvidiaProprietary
4  *
5  * NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
6  * property and proprietary rights in and to this material, related
7  * documentation and any modifications thereto. Any use, reproduction,
8  * disclosure or distribution of this material and related documentation
9  * without an express license agreement from NVIDIA CORPORATION or
10  * its affiliates is strictly prohibited.
11  */
12 
13 #include "mqtt_protocol.h"
14 #include "mosquitto.h"
15 
16 #include <mutex>
17 #include <unordered_map>
18 #include <errno.h>
19 #include <syslog.h>
20 #include "glib.h"
21 #include <string.h>
22 #include <string>
23 #include "nvds_msgapi.h"
24 #include "nvds_utils.h"
25 
26 using namespace std;
27 
28 #define MAX_FIELD_LEN 1024
29 
30 // Structure to hold extracted data from protobuf payload
31 struct ExtractedData {
32  std::string sensor_id; // Extracted from metadata
33  std::string timestamp; // Not used - timestamp is always current system time
34 };
35 
36 // Structure to hold protobuf wrapper message
37 struct ProtobufWrapper {
38  std::string payload; // Raw protobuf binary data (no encoding needed)
39  std::string key; // sensorId extracted from metadata
40  std::string timestamp; // current system timestamp when data was sent
41 };
42 #define DEFAULT_LOOP_TIMEOUT 2000
43 #define DEFAULT_KEEP_ALIVE 60
44 /* Message details:
45  * send_callback = user callback func
46  * user_ptr = user pointer passed by async send
47  */
48 struct send_msg_info_t {
49  nvds_msgapi_send_cb_t send_callback;
50  void *user_ptr;
51 };
52 
53 /* Details of mqtt connection handle:
54  * mosq : mosquitto client object
55  * sub_callback : user subscription callback func
56  * connect_cb : user connection callback func
57  * user_ctx : user pointer passed by sub
58  * username: username for login to server
59  * password: password for login to server
60  * client_id: name of MQTT client
61  * loop_timeout : time in ms for the call to loop to wait for network activity
62  * keep_alive : number of seconds after which broker should send PING if no messages have been exchanged
63  * subscription_on : Flag to check if subscription is ON
64  * send_msg_info_map : map message info to id assigned by mosquitto broker
65  * map_lock : mutex lock for accessing above map
66  * enable_tls : flag to check if TLS encryption is enabled by the broker
67  * cafile : path to a TLS certificate authority file
68  * capath : path to a directory containing TLS CA files
69  * certfile : path to the client TLS certificate file
70  * keyfile : path to the client TLS key file
71  * disconnect : bool for checking if disconnect has been called
72  * set_threaded : bool for setting mosquitto_threaded_set in proto adaptor
73  */
74 typedef struct {
75  struct mosquitto *mosq = NULL;
77  nvds_msgapi_connect_cb_t connect_cb;
78  void* user_ctx;
79  char connection_str[MAX_FIELD_LEN] = {0};
80  char username[MAX_FIELD_LEN] = {0};
81  char password[MAX_FIELD_LEN] = {0};
82  char client_id[MAX_FIELD_LEN] = {0};
83  int loop_timeout = DEFAULT_LOOP_TIMEOUT;
84  int keep_alive = DEFAULT_KEEP_ALIVE;
85  bool subscription_on = false;
86  std::unordered_map<int , send_msg_info_t> send_msg_info_map;
87  std::mutex map_lock;
88  bool enable_tls = false;
89  char cafile[MAX_FIELD_LEN] = {0};
90  char capath[MAX_FIELD_LEN] = {0};
91  char certfile[MAX_FIELD_LEN] = {0};
92  char keyfile[MAX_FIELD_LEN] = {0};
93  bool disconnect = false;
94  bool set_threaded = true;
96 
98 void mosq_mqtt_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str);
99 void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties);
100 void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties);
101 void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties);
102 bool is_valid_mqtt_connection_str(char *connection_str, std::string &burl, std::string &bport);
103 
104 // Function declarations for protobuf wrapper
105 ExtractedData extract_data_from_payload(const uint8_t *payload, size_t nbuf);
106 std::string create_protobuf_wrapper(const uint8_t *payload, size_t nbuf);
107 std::string create_enhanced_payload(const uint8_t *payload, size_t nbuf);
108 std::string serialize_protobuf_wrapper(const ProtobufWrapper& wrapper);
send_msg_info_t
Definition: sources/includes/nvds_mqtt_proto.h:48
nvds_msgapi_connect_cb_t
void(* nvds_msgapi_connect_cb_t)(NvDsMsgApiHandle h_ptr, NvDsMsgApiEventType ds_evt)
Type definition for a "handle" callback.
Definition: sources/includes/nvds_msgapi.h:98
DEFAULT_KEEP_ALIVE
#define DEFAULT_KEEP_ALIVE
Definition: 9.0/sources/includes/nvds_mqtt_proto.h:43
mosq_mqtt_log_callback
void mosq_mqtt_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
NvDsMsgApiErrorType
NvDsMsgApiErrorType
Defines completion codes for operations in the messaging API.
Definition: sources/includes/nvds_msgapi.h:57
nvds_mqtt_read_config
NvDsMsgApiErrorType nvds_mqtt_read_config(NvDsMqttClientHandle *mh, char *config_path)
nvds_utils.h
my_disconnect_callback
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int rc, const mosquitto_property *properties)
ProtobufWrapper
Definition: sources/includes/nvds_mqtt_proto.h:37
ExtractedData
Definition: sources/includes/nvds_mqtt_proto.h:31
DEFAULT_LOOP_TIMEOUT
#define DEFAULT_LOOP_TIMEOUT
Definition: 9.0/sources/includes/nvds_mqtt_proto.h:42
MAX_FIELD_LEN
#define MAX_FIELD_LEN
Definition: 9.0/sources/includes/nvds_mqtt_proto.h:28
my_connect_callback
void my_connect_callback(struct mosquitto *mosq, void *obj, int result, int flags, const mosquitto_property *properties)
NvDsMqttClientHandle
Definition: sources/includes/nvds_mqtt_proto.h:74
create_enhanced_payload
std::string create_enhanced_payload(const uint8_t *payload, size_t nbuf)
extract_data_from_payload
ExtractedData extract_data_from_payload(const uint8_t *payload, size_t nbuf)
nvds_msgapi_send_cb_t
void(* nvds_msgapi_send_cb_t)(void *user_ptr, NvDsMsgApiErrorType completion_flag)
Type definition for a "send" callback.
Definition: sources/includes/nvds_msgapi.h:71
is_valid_mqtt_connection_str
bool is_valid_mqtt_connection_str(char *connection_str, std::string &burl, std::string &bport)
create_protobuf_wrapper
std::string create_protobuf_wrapper(const uint8_t *payload, size_t nbuf)
my_publish_callback
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid, int reason_code, const mosquitto_property *properties)
nvds_msgapi.h
nvds_msgapi_subscribe_request_cb_t
void(* nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr)
Type definition for callback registered during subscribe.
Definition: sources/includes/nvds_msgapi.h:87
serialize_protobuf_wrapper
std::string serialize_protobuf_wrapper(const ProtobufWrapper &wrapper)