Services/NotificationManager.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const BadRequestError=require("../Errors/BadRequestError"),InvalidInputError=require("../Errors/InvalidInputError"),InternalServerError=require("../Errors/InternalServerError"),calibrationSchema=require("../ajv-schemas/calibration.json"),Kafka=require("../Utils/Kafka"),Validator=require("../Utils/Validator"),Database=require("../Utils/Database"),MessageBroker=require("../Utils/MessageBroker"),mtmcAnalyticsConfigSchema=require("../ajv-schemas/mtmcAnalyticsConfig.json"),winston=require("winston"),logger=winston.createLogger({transports:[new winston.transports.Console({timestamp:!0})],exitOnError:!1});
/** 
 * Class which defines NotificationManager
 * @memberof mdxWebApiCore.Services
 * */
class NotificationManager{async#e(e,{messageKey:a,messageValue:t,timestamp:i,eventType:r}){const o=e.getClient(),n=Kafka.getTopic("notification");let s={key:a,value:t,headers:{"event.type":Buffer.from(r,"utf8"),timestamp:Buffer.from(i,"utf8")}};return await Kafka.produceMessages(o,n,[s])}
/** 
     * returns a success message when the calibration input message is sent to the message broker.
     * @public
     * @async
     * @param {MessageBroker} messageBroker - MessageBroker Object
     * @param {Object} input - Input object.
     * @param {string} input.timestamp
     * @param {("upsert-all"|"upsert"|"delete")} input.eventType
     * @param {Object} input.calibration - Calibration object
     * @returns {Promise<Object>} A success message is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
     * let input = {timestamp: "2023-01-12T14:20:10.000Z", eventType: "upsert-all", calibration};
     * let notificationManagerObject = new mdx.Services.NotificationManager();
     * let result = await notificationManagerObject.produceCalibrationNotification(kafka,input);
     */async produceCalibrationNotification(e,a){if(null==e)throw new InvalidInputError("A message broker like 'kafka' is required to produce calibration notification.");let t=Validator.validateJsonSchema(a,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{timestamp:{type:"string"},eventType:{type:"string",enum:["upsert-all","upsert","delete"],errorMessage:{enum:"eventType must be one of the following values: 'upsert-all', 'upsert' or 'delete'."}},calibration:{type:"object"}},required:["timestamp","eventType","calibration"],errorMessage:{required:"Input should have required properties 'timestamp', 'eventType' and 'calibration'."}});if(!t.valid)throw new BadRequestError(t.reason);if(!Validator.isValidISOTimestamp(a.timestamp))throw new InvalidInputError("Invalid timestamp.");let i=Validator.validateJsonSchema(a.calibration,calibrationSchema,!1);if(!i.valid){if("Invalid input. Error 1: calibrationType must be one of the following values: 'geo', 'cartesian' or 'image'."!==i.reason)throw new BadRequestError("Calibration doesn't follow schema.");if(""!==a.calibration.calibrationType||""===a.calibration.calibrationType&&a.calibration.sensors.length>0)throw new BadRequestError("Calibration doesn't follow schema.")}if("Kafka"===e.getName()){let t=await this.#e(e,{timestamp:a.timestamp,eventType:a.eventType,messageKey:"calibration",messageValue:JSON.stringify(a.calibration)});return logger.info("[Kafka Message] Produced calibration message."),t}throw new InternalServerError(`Invalid message broker: ${e.getName()}.`)}async#a(e,a,t,i){const r=e.getClient(),o=Kafka.getTopic("notification"),n=await Kafka.getConsumer(r,o,"mdx-notification-web-api");await n.run({autoCommit:!1,eachMessage:async({topic:r,partition:o,message:n})=>{if(null!=n.key){let r=n.key.toString();if("request-calibration"===r){logger.info("[Kafka Message] Received calibration request.");let{calibration:t,timestamp:r}=await i.getCalibration(a),o=JSON.stringify(t);await this.#e(e,{messageKey:"calibration",messageValue:o,timestamp:r,eventType:"upsert-all"}),logger.info("[Kafka Message] Produced calibration message.")}else if("request-mdx-mtmc-analytics-config"===r){logger.info("[Kafka Message] Received mdx mtmc analytics config request.");const i="mdx-mtmc-analytics";let{config:r,timestamp:o}=await t.getConfig(a,i);await this.produceConfigNotification(e,{docType:i,timestamp:o,eventType:"upsert-all",config:r}),logger.info("[Kafka Message] Produced mdx mtmc analytics config.")}else if("init-mdx-mtmc-analytics-config"===r){logger.info("[Kafka Message] Received mdx mtmc analytics init config.");const e="mdx-mtmc-analytics";let i=n.value.toString();await t.initConfig(a,e,i)}}}})}
/** 
     * consumes and processes incoming notification messages.
     * @public
     * @async
     * @param {MessageBroker} messageBroker - MessageBroker Object
     * @param {Database} documentDb - Database Object
     * @param {Object} configManagerObject - configManager Object
     * @param {Object} calibrationObject - calibration Object
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let notificationManagerObject = new mdx.Services.NotificationManager();
     * let configManagerObject = new mdx.Services.ConfigManager();
     * let calibrationObject = new mdx.Services.Calibration();
     * await notificationManagerObject.consumeAndProcessNotification(kafka,elastic,configManagerObject,calibrationObject);
     */async consumeAndProcessNotification(e,a,t,i){if(null==e)throw new InvalidInputError("A message broker like 'kafka' is required to consume and process notification.");if("Kafka"!==e.getName())throw new InternalServerError(`Invalid message broker: ${e.getName()}.`);await this.#a(e,a,t,i)}
/** 
     * returns a success message when the config related input message is sent to the message broker.
     * @public
     * @async
     * @param {MessageBroker} messageBroker - MessageBroker Object
     * @param {Object} input - Input object.
     * @param {("mdx-mtmc-analytics")} input.docType
     * @param {string} input.timestamp
     * @param {("upsert-all"|"upsert")} input.eventType
     * @param {?string} input.config - JSON stringified config object
     * @returns {Promise<Object>} A success message is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const kafka = new mdx.Utils.Kafka({brokers: ["kafka-broker-url"]}, kafkaConfigMap);
     * let input = {docType: "mdx-mtmc-analytics", timestamp: "2023-01-12T14:20:10.000Z", eventType: "upsert", config};
     * let notificationManagerObject = new mdx.Services.NotificationManager();
     * let result = await notificationManagerObject.produceConfigNotification(kafka,input);
     */async produceConfigNotification(e,a){if(null==e)throw new InvalidInputError("A message broker like 'kafka' is required to produce config notification.");const t=["mdx-mtmc-analytics"],i={type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{docType:{type:"string",enum:t,errorMessage:{enum:`docType must be one of the following values: ${t.join(", ")}.`}},timestamp:{type:"string"},eventType:{type:"string",enum:["upsert-all","upsert"],errorMessage:{enum:"eventType must be one of the following values: 'upsert-all' or 'upsert'."}},config:{type:["string","null"]}},required:["docType","timestamp","eventType","config"],errorMessage:{required:"Input should have required properties 'docType', 'timestamp', 'eventType' and 'config'."}};let r=Validator.validateJsonSchema(a,i);if(!r.valid)throw new BadRequestError(r.reason);if(!Validator.isValidISOTimestamp(a.timestamp))throw new InvalidInputError("Invalid timestamp.");if("mdx-mtmc-analytics"===a.docType)if(null!=a.config){if(!Validator.validateJsonSchema(JSON.parse(a.config),mtmcAnalyticsConfigSchema,!1).valid)throw new BadRequestError("MTMC Analytics Configuration doesn't follow schema.")}else if("upsert-all"!==a.eventType)throw new InvalidInputError("'upsert-all' is the only supported eventType when config is null.");if("Kafka"===e.getName()){let t=await this.#e(e,{timestamp:a.timestamp,eventType:a.eventType,messageKey:`${a.docType}-config`,messageValue:null==a.config?"":a.config});return logger.info("[Kafka Message] Produced mdx mtmc analytics config."),t}throw new InternalServerError(`Invalid message broker: ${e.getName()}.`)}}module.exports=NotificationManager;