/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const MessageBroker=require("./MessageBroker"),{Kafka:KafkaClient}=require("kafkajs");
/**
* Class containing Kafka Utils
* @memberof mdxWebApiCore.Utils
* @extends MessageBroker
* */
class Kafka extends MessageBroker{
/**
* Constructor
* @param {Object} connectionObject
* @param {Map} configs
*/
constructor(t,s){super({name:"Kafka",client:new KafkaClient(t),configs:s})}static#t=this.#s();static#s(){let t=new Map;return t.set("notification","mdx-notification"),t.set("rtls","mdx-rtls"),t.set("amr","mdx-amr"),t}
/**
* Used to return topic
* @public
* @static
* @param {string} topicType
* @returns {string|undefined} Returns topic
* @example
* const mdx = require("@nvidia-mdx/web-api-core");
* let topicType = "notification";
* let topic = mdx.Utils.Kafka.getTopic(topicType);
*/static getTopic(t){return this.#t.get(t)}
/**
* returns a success message once the input messages are produced
* @public
* @static
* @async
* @param {Object} client
* @param {string} topic
* @param {Array<Object>} messages - Each message will have value and may contain key and headers.
* @returns {Promise<Object>} A success message is returned
* @example
* const mdx = require("@nvidia-mdx/web-api-core");
* const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
* let result = await kafka.produceMessages(kafka.getClient(), topic, messages);
*/static async produceMessages(t,s,e){const a=t.producer();return await a.connect(),await a.send({topic:s,messages:e}),await a.disconnect(),{success:!0}}
/**
* returns a kafka consumer
* @public
* @static
* @async
* @param {Object} client
* @param {string} topic
* @param {string} consumerGroup
* @returns {Promise<Object>} Kafka consumer is returned
* @example
* const mdx = require("@nvidia-mdx/web-api-core");
* const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
* let consumer = await kafka.getConsumer(kafka.getClient(), topic, consumerGroup);
*/static async getConsumer(t,s,e){const a=t.consumer({groupId:e,maxWaitTimeInMs:100});return await a.connect(),await a.subscribe({topic:s}),a}}module.exports=Kafka;