NVIDIA DeepStream SDK API Reference

7.0 Release
kafka_client.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2018-2020 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  * SPDX-License-Identifier: LicenseRef-NvidiaProprietary
4  *
5  * NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
6  * property and proprietary rights in and to this material, related
7  * documentation and any modifications thereto. Any use, reproduction,
8  * disclosure or distribution of this material and related documentation
9  * without an express license agreement from NVIDIA CORPORATION or
10  * its affiliates is strictly prohibited.
11  */
12 
13 #include <iostream>
14 #include <vector>
15 #include <string>
16 #include <sstream>
17 using namespace std;
18 
19 #include "rdkafka.h"
20 #include "nvds_msgapi.h"
21 
22 #define MAX_FIELD_LEN 1024
23 #define MAX_TOPIC_LEN 255 //maximum topic length supported by kafka is 255
24 #define NVDS_KAFKA_LOG_CAT "DSLOG:NVDS_KAFKA_PROTO"
25 
27  public:
28  virtual void sendcomplete(NvDsMsgApiErrorType);
29  NvDsMsgApiErrorType get_err();
30  virtual ~NvDsKafkaSendCompl() = default;
31 };
32 
34  private:
35  uint8_t *compl_flag;
37 
38  public:
39  NvDsKafkaSyncSendCompl(uint8_t *);
40  void sendcomplete(NvDsMsgApiErrorType);
41  NvDsMsgApiErrorType get_err();
42 };
43 
45  private:
46  void *user_ptr;
47  nvds_msgapi_send_cb_t async_send_cb;
48 
49  public:
51  void sendcomplete(NvDsMsgApiErrorType);
52 };
53 
54 typedef struct {
55  pthread_t consumer_tid; /* Thread which waits on incoming msg from cloud*/
57  void *user_ptr; /* User context pointer */
59 
60 typedef struct {
61  rd_kafka_t *consumer; /* Consumer instance handle */
62  char consumer_grp_id[MAX_FIELD_LEN]; /* Consumer group id */
63  consumer_thread_info cinfo; /* Consumer thread info*/
64  bool disconnect; /* variable to notify consume thread to quit */
65  string config; /* config options for consumer instance */
67 
68 typedef struct {
69  char partition_key_field[MAX_FIELD_LEN]; /* partition key for messages */
70  rd_kafka_t *producer; /* Producer instance handle */
72 
73 typedef struct {
74  char brokers[MAX_FIELD_LEN]; /* Broker string - comma separated host:port */
75  producer_instance_t p_instance; /* Producer instance details */
76  consumer_instance_t c_instance; /* consumer instance details */
78 
80 NvDsMsgApiErrorType nvds_kafka_producer_launch(void *kh, rd_kafka_conf_t *conf);
81 NvDsMsgApiErrorType nvds_kafka_client_send(void *kh, const uint8_t *payload, int len, char *topic, int sync, void *ctx, nvds_msgapi_send_cb_t cb, char *key, int keylen);
82 NvDsMsgApiErrorType nvds_kafka_client_setconf(rd_kafka_conf_t *conf, char *key, char *val);
83 void nvds_kafka_client_poll(void *kv);
84 void nvds_kafka_client_finish(void *kv);
nvds_kafka_client_finish
void nvds_kafka_client_finish(void *kv)
nvds_kafka_producer_launch
NvDsMsgApiErrorType nvds_kafka_producer_launch(void *kh, rd_kafka_conf_t *conf)
nvds_msgapi.h
NvDsKafkaSyncSendCompl
Definition: kafka_client.h:33
NvDsKafkaClientHandle::p_instance
producer_instance_t p_instance
Definition: kafka_client.h:75
NvDsMsgApiErrorType
NvDsMsgApiErrorType
Defines completion codes for operations in the messaging API.
Definition: nvds_msgapi.h:57
nvds_kafka_client_setconf
NvDsMsgApiErrorType nvds_kafka_client_setconf(rd_kafka_conf_t *conf, char *key, char *val)
consumer_instance_t::consumer
rd_kafka_t * consumer
Definition: kafka_client.h:61
consumer_thread_info::consumer_tid
pthread_t consumer_tid
Definition: kafka_client.h:55
consumer_thread_info::subscribe_req_cb
nvds_msgapi_subscribe_request_cb_t subscribe_req_cb
Definition: kafka_client.h:56
MAX_FIELD_LEN
#define MAX_FIELD_LEN
Definition: kafka_client.h:22
nvds_kafka_client_poll
void nvds_kafka_client_poll(void *kv)
producer_instance_t
Definition: kafka_client.h:68
consumer_instance_t::config
string config
Definition: kafka_client.h:65
producer_instance_t::producer
rd_kafka_t * producer
Definition: kafka_client.h:70
consumer_thread_info
Definition: kafka_client.h:54
NvDsKafkaClientHandle::c_instance
consumer_instance_t c_instance
Definition: kafka_client.h:76
NvDsKafkaSendCompl
Definition: kafka_client.h:26
consumer_instance_t::cinfo
consumer_thread_info cinfo
Definition: kafka_client.h:63
NvDsKafkaClientHandle
Definition: kafka_client.h:73
nvds_msgapi_send_cb_t
void(* nvds_msgapi_send_cb_t)(void *user_ptr, NvDsMsgApiErrorType completion_flag)
Type definition for a "send" callback.
Definition: nvds_msgapi.h:71
nvds_kafka_client_init
void * nvds_kafka_client_init(NvDsKafkaClientHandle *kh)
consumer_instance_t
Definition: kafka_client.h:60
consumer_thread_info::user_ptr
void * user_ptr
Definition: kafka_client.h:57
nvds_kafka_client_send
NvDsMsgApiErrorType nvds_kafka_client_send(void *kh, const uint8_t *payload, int len, char *topic, int sync, void *ctx, nvds_msgapi_send_cb_t cb, char *key, int keylen)
nvds_msgapi_subscribe_request_cb_t
void(* nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr)
Type definition for callback registered during subscribe.
Definition: nvds_msgapi.h:87
consumer_instance_t::disconnect
bool disconnect
Definition: kafka_client.h:64
NvDsKafkaAsyncSendCompl
Definition: kafka_client.h:44