NVIDIA DeepStream SDK API Reference

6.4 Release
kafka_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018-2020 NVIDIA Corporation. All rights reserved.
3  *
4  * NVIDIA Corporation and its licensors retain all intellectual property
5  * and proprietary rights in and to this software, related documentation
6  * and any modifications thereto. Any use, reproduction, disclosure or
7  * distribution of this software and related documentation without an express
8  * license agreement from NVIDIA Corporation is strictly prohibited.
9  *
10  */
11 
12 #include <iostream>
13 #include <vector>
14 #include <string>
15 #include <sstream>
16 using namespace std;
17 
18 #include "rdkafka.h"
19 #include "nvds_msgapi.h"
20 
21 #define MAX_FIELD_LEN 1024
22 #define MAX_TOPIC_LEN 255 //maximum topic length supported by kafka is 255
23 #define NVDS_KAFKA_LOG_CAT "DSLOG:NVDS_KAFKA_PROTO"
24 
26  public:
27  virtual void sendcomplete(NvDsMsgApiErrorType);
28  NvDsMsgApiErrorType get_err();
29  virtual ~NvDsKafkaSendCompl() = default;
30 };
31 
33  private:
34  uint8_t *compl_flag;
36 
37  public:
38  NvDsKafkaSyncSendCompl(uint8_t *);
39  void sendcomplete(NvDsMsgApiErrorType);
40  NvDsMsgApiErrorType get_err();
41 };
42 
44  private:
45  void *user_ptr;
46  nvds_msgapi_send_cb_t async_send_cb;
47 
48  public:
50  void sendcomplete(NvDsMsgApiErrorType);
51 };
52 
53 typedef struct {
54  pthread_t consumer_tid; /* Thread which waits on incoming msg from cloud*/
56  void *user_ptr; /* User context pointer */
58 
59 typedef struct {
60  rd_kafka_t *consumer; /* Consumer instance handle */
61  char consumer_grp_id[MAX_FIELD_LEN]; /* Consumer group id */
62  consumer_thread_info cinfo; /* Consumer thread info*/
63  bool disconnect; /* variable to notify consume thread to quit */
64  string config; /* config options for consumer instance */
66 
67 typedef struct {
68  char partition_key_field[MAX_FIELD_LEN]; /* partition key for messages */
69  rd_kafka_t *producer; /* Producer instance handle */
71 
72 typedef struct {
73  char brokers[MAX_FIELD_LEN]; /* Broker string - comma separated host:port */
74  producer_instance_t p_instance; /* Producer instance details */
75  consumer_instance_t c_instance; /* consumer instance details */
77 
79 NvDsMsgApiErrorType nvds_kafka_producer_launch(void *kh, rd_kafka_conf_t *conf);
80 NvDsMsgApiErrorType nvds_kafka_client_send(void *kh, const uint8_t *payload, int len, char *topic, int sync, void *ctx, nvds_msgapi_send_cb_t cb, char *key, int keylen);
81 NvDsMsgApiErrorType nvds_kafka_client_setconf(rd_kafka_conf_t *conf, char *key, char *val);
82 void nvds_kafka_client_poll(void *kv);
83 void nvds_kafka_client_finish(void *kv);
nvds_kafka_client_finish
void nvds_kafka_client_finish(void *kv)
nvds_kafka_producer_launch
NvDsMsgApiErrorType nvds_kafka_producer_launch(void *kh, rd_kafka_conf_t *conf)
nvds_msgapi.h
NvDsKafkaSyncSendCompl
Definition: kafka_client.h:32
NvDsKafkaClientHandle::p_instance
producer_instance_t p_instance
Definition: kafka_client.h:74
NvDsMsgApiErrorType
NvDsMsgApiErrorType
Defines completion codes for operations in the messaging API.
Definition: nvds_msgapi.h:56
nvds_kafka_client_setconf
NvDsMsgApiErrorType nvds_kafka_client_setconf(rd_kafka_conf_t *conf, char *key, char *val)
consumer_instance_t::consumer
rd_kafka_t * consumer
Definition: kafka_client.h:60
consumer_thread_info::consumer_tid
pthread_t consumer_tid
Definition: kafka_client.h:54
consumer_thread_info::subscribe_req_cb
nvds_msgapi_subscribe_request_cb_t subscribe_req_cb
Definition: kafka_client.h:55
MAX_FIELD_LEN
#define MAX_FIELD_LEN
Definition: kafka_client.h:21
nvds_kafka_client_poll
void nvds_kafka_client_poll(void *kv)
producer_instance_t
Definition: kafka_client.h:67
consumer_instance_t::config
string config
Definition: kafka_client.h:64
producer_instance_t::producer
rd_kafka_t * producer
Definition: kafka_client.h:69
consumer_thread_info
Definition: kafka_client.h:53
NvDsKafkaClientHandle::c_instance
consumer_instance_t c_instance
Definition: kafka_client.h:75
NvDsKafkaSendCompl
Definition: kafka_client.h:25
consumer_instance_t::cinfo
consumer_thread_info cinfo
Definition: kafka_client.h:62
NvDsKafkaClientHandle
Definition: kafka_client.h:72
nvds_msgapi_send_cb_t
void(* nvds_msgapi_send_cb_t)(void *user_ptr, NvDsMsgApiErrorType completion_flag)
Type definition for a "send" callback.
Definition: nvds_msgapi.h:70
nvds_kafka_client_init
void * nvds_kafka_client_init(NvDsKafkaClientHandle *kh)
consumer_instance_t
Definition: kafka_client.h:59
consumer_thread_info::user_ptr
void * user_ptr
Definition: kafka_client.h:56
nvds_kafka_client_send
NvDsMsgApiErrorType nvds_kafka_client_send(void *kh, const uint8_t *payload, int len, char *topic, int sync, void *ctx, nvds_msgapi_send_cb_t cb, char *key, int keylen)
nvds_msgapi_subscribe_request_cb_t
void(* nvds_msgapi_subscribe_request_cb_t)(NvDsMsgApiErrorType flag, void *msg, int msg_len, char *topic, void *user_ptr)
Type definition for callback registered during subscribe.
Definition: nvds_msgapi.h:86
consumer_instance_t::disconnect
bool disconnect
Definition: kafka_client.h:63
NvDsKafkaAsyncSendCompl
Definition: kafka_client.h:43