Utils/Kafka.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const MessageBroker=require("./MessageBroker"),{Kafka:KafkaClient}=require("kafkajs");
/** 
 * Class containing Kafka Utils
 * @memberof mdxWebApiCore.Utils
 * @extends MessageBroker
 * */
class Kafka extends MessageBroker{
/**
     * Constructor
     * @param {Object} connectionObject
     * @param {Map} configs
     */
constructor(t,s){super({name:"Kafka",client:new KafkaClient(t),configs:s})}static#t=this.#s();static#s(){let t=new Map;return t.set("notification","mdx-notification"),t.set("rtls","mdx-rtls"),t.set("amr","mdx-amr"),t}
/**
     * Used to return topic
     * @public
     * @static
     * @param {string} topicType 
     * @returns {string|undefined} Returns topic
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * let topicType = "notification";
     * let topic = mdx.Utils.Kafka.getTopic(topicType);
     */static getTopic(t){return this.#t.get(t)}
/**
     * returns a success message once the input messages are produced
     * @public
     * @static
     * @async
     * @param {Object} client
     * @param {string} topic
     * @param {Array<Object>} messages - Each message will have value and may contain key and headers.
     * @returns {Promise<Object>} A success message is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
     * let result = await kafka.produceMessages(kafka.getClient(), topic, messages);
     */static async produceMessages(t,s,e){const a=t.producer();return await a.connect(),await a.send({topic:s,messages:e}),await a.disconnect(),{success:!0}}
/**
     * returns a kafka consumer
     * @public
     * @static
     * @async
     * @param {Object} client
     * @param {string} topic
     * @param {string} consumerGroup
     * @returns {Promise<Object>} Kafka consumer is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
     * let consumer = await kafka.getConsumer(kafka.getClient(), topic, consumerGroup);
     */static async getConsumer(t,s,e){const a=t.consumer({groupId:e,maxWaitTimeInMs:100});return await a.connect(),await a.subscribe({topic:s}),a}}module.exports=Kafka;