Metrics/LastProcessedTimestamp.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const deepcopy=require("deepcopy"),Database=require("../Utils/Database"),filterTemplate=require("../queryTemplates/filter.json"),Elasticsearch=require("../Utils/Elasticsearch"),Validator=require("../Utils/Validator"),InternalServerError=require("../Errors/InternalServerError"),BadRequestError=require("../Errors/BadRequestError"),Calibration=require("../Services/Calibration"),Utils=require("../Utils/Utils"),NodeCache=require("node-cache");let cache=new NodeCache;
/** 
 * Class which defines LastProcessedTimestamp
 * @memberof mdxWebApiCore.Metrics
 * */class LastProcessedTimestamp{async#e(e,{sensorId:s}){const t=e.getConfigs().get("rawIndex");let a=deepcopy(filterTemplate);a.query.bool.must.push({term:{"sensorId.keyword":s}});let r={index:t,body:a,size:1,sort:"timestamp:desc",_sourceIncludes:["timestamp"]};return await Elasticsearch.getSearchResults(e.getClient(),r,!1)}async#s(e,s){let t=null;if("Elasticsearch"===e.getName()){let a=await this.#e(e,s);return a.indexAbsent||(a=Elasticsearch.searchResultFormatter(a.body),a.length>0&&(t=a[0].timestamp)),t}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#t(e,{place:s}){const t=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("behavior")}`;let a=deepcopy(filterTemplate);a.query.bool.must.push({term:{"place.name.keyword":s}}),a.aggs={sensorIds:{terms:{field:"sensor.id.keyword",size:1e4},aggs:{lastProcessedTimestamp:{top_hits:{size:1,sort:[{end:{order:"desc"}}],_source:{includes:["end"]}}}}}};let r={index:t,body:a,size:1,sort:"end:desc",_sourceIncludes:["sensor.id","end"]};return await Elasticsearch.getSearchResults(e.getClient(),r,!1)}async#a(e,s){let t={latestTimestamp:null,sensorTimestampMap:new Map};if("Elasticsearch"===e.getName()){let a=await this.#t(e,s);if(!a.indexAbsent){a=a.body;let e=Elasticsearch.searchResultFormatter(a);if(e.length>0){t.latestTimestamp={sensorId:e[0].sensor.id,timestamp:e[0].end};for(let e of a.aggregations.sensorIds.buckets){let s=e.key,a=Elasticsearch.searchResultFormatter(e.lastProcessedTimestamp);a=a[0].end,t.sensorTimestampMap.set(s,a)}}}return t}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#r(e,{place:s}){const t=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("behavior")}`;let a=s.split("/").length,r=deepcopy(filterTemplate);r.query.bool.must.push({prefix:{"place.name.keyword":s}}),r.aggs={placeSuccessor:{terms:{script:{lang:"painless",source:`String place_prefix = ''; \n                                int i=0; \n                                for (item in doc['place.name.keyword'].value.splitOnToken('/')) { \n                                    i+=1;\n                                    if(i!=1){\n                                        place_prefix +='/';\n                                    }\n                                    place_prefix += item; \n                                    if (i==${a+1}){\n                                        break;\n                                    }\n                                } \n                                return place_prefix;`},size:1e4},aggs:{lastProcessedTimestamp:{top_hits:{size:1,sort:[{end:{order:"desc"}}],_source:{includes:["end"]}}}}}};let l={index:t,body:r,size:1,sort:"end:desc",_sourceIncludes:["place.name","end"]};return await Elasticsearch.getSearchResults(e.getClient(),l,!1)}async#l(e,s){let t={latestTimestamp:null,placeTimestampMap:new Map};if("Elasticsearch"===e.getName()){let a=await this.#r(e,s);if(!a.indexAbsent){a=a.body;let e=Elasticsearch.searchResultFormatter(a);if(e.length>0){let r=s.place.split("/").length;t.latestTimestamp={place:e[0].place.name.split("/").slice(0,r+1).join("/"),timestamp:e[0].end};for(let e of a.aggregations.placeSuccessor.buckets){let s=e.key,a=Elasticsearch.searchResultFormatter(e.lastProcessedTimestamp);a=a[0].end,t.placeTimestampMap.set(s,a)}}}return t}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}
/** 
     * returns an object containing last processed timestamp.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} [input.sensorId] - Either sensorId or place should be present.
     * @param {string} [input.place] - Either sensorId or place should be present.
     * @returns {Promise<Object>} Last Processed Timestamp is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc"};
     * let lastProcessedTimestampObject = new mdx.Metrics.LastProcessedTimestamp();
     * let result = await lastProcessedTimestampObject.getLastProcessedTimestamp(elastic,input);
     */async getLastProcessedTimestamp(e,s){let t=Validator.validateJsonSchema(s,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},place:{type:"string",minLength:1,errorMessage:{minLength:"place should have atleast 1 character."}}},oneOf:[{required:["sensorId"]},{required:["place"]}],errorMessage:{oneOf:"Input should have either 'sensorId' or 'place'."}});if(!t.valid)throw new BadRequestError(t.reason);if("sensorId"in s){let t={latestTimestamp:null},a=await this.#s(e,s);return null!=a&&(t.latestTimestamp={sensorId:s.sensorId,timestamp:a}),t}{let t=new Calibration,{timestamp:a,calibration:r}=await t.getCalibration(e),l=cache.get("calibration-timestamp");if(null==l||Utils.tsCompare(a,">",l)){let e=t.getCalibrationMaps(r);cache.set("placeHierarchyMap",e.placeHierarchyMap),cache.set("sensorPlaceMap",e.sensorPlaceMap),cache.set("calibration-timestamp",a)}let i=cache.get("sensorPlaceMap"),n=cache.get("placeHierarchyMap");if(!n.has(s.place))return{latestTimestamp:null};let o=n.get(s.place),c={leafPlace:null==o.places,sensor:null!=o.sensors},m={latestTimestamp:null,details:null};if(!c.leafPlace&&c.sensor){let[t,a]=await Promise.all([this.#a(e,s),this.#l(e,s)]);if(null!=t.latestTimestamp){m={latestTimestamp:t.latestTimestamp,details:new Array},m.latestTimestamp.place=i.get(m.latestTimestamp.sensorId);for(let[e,s]of t.sensorTimestampMap)m.details.push({place:i.get(e),sensorId:e,timestamp:s})}if(null!=a.latestTimestamp){null==m.latestTimestamp?m={latestTimestamp:a.latestTimestamp,details:new Array}:Utils.tsCompare(a.latestTimestamp.timestamp,">",m.latestTimestamp.timestamp)&&(m.latestTimestamp=a.latestTimestamp);for(let[e,s]of a.placeTimestampMap)m.details.push({place:e,timestamp:s})}return m}if(!c.leafPlace){let t=await this.#l(e,s);if(null!=t.latestTimestamp){m={latestTimestamp:t.latestTimestamp,details:new Array};for(let[e,s]of t.placeTimestampMap)m.details.push({place:e,timestamp:s})}return m}if(c.leafPlace){let t=await this.#a(e,s);if(null!=t.latestTimestamp){m={latestTimestamp:t.latestTimestamp,details:new Array},m.latestTimestamp.place=i.get(m.latestTimestamp.sensorId);for(let[e,s]of t.sensorTimestampMap)m.details.push({place:i.get(e),sensorId:e,timestamp:s})}return m}}}}module.exports=LastProcessedTimestamp;