Metrics/TripwireEvent.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const deepcopy=require("deepcopy"),Database=require("../Utils/Database"),filterTemplate=require("../queryTemplates/filter.json"),Elasticsearch=require("../Utils/Elasticsearch"),Histogram=require("../Utils/Histogram"),Validator=require("../Utils/Validator"),Utils=require("../Utils/Utils"),InternalServerError=require("../Errors/InternalServerError"),InvalidInputError=require("../Errors/InvalidInputError"),BadRequestError=require("../Errors/BadRequestError"),winston=require("winston"),logger=winston.createLogger({transports:[new winston.transports.Console({timestamp:!0})],exitOnError:!1});
/** 
 * Class which defines TripwireEvent
 * @memberof mdxWebApiCore.Metrics
 * */
class TripwireEvent{async#e(e,{sensorId:t,fromTimestamp:r,toTimestamp:s,tripwireId:o,objectType:a}){const i=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("tripwire")}`;let n=deepcopy(filterTemplate);n.query.bool.must.push({term:{"sensor.id.keyword":t}}),n.query.bool.must.push({range:{timestamp:{lte:s}}}),n.query.bool.must.push({range:{end:{gte:r}}}),null!=o&&n.query.bool.must.push({term:{"event.id.keyword":o}}),n.query.bool.must.push({term:{"object.type.keyword":a}}),n.aggs={eventIds:{terms:{field:"event.id.keyword",size:50},aggs:{eventTypes:{terms:{field:"event.type.keyword"}}}}};let u={index:i,body:n,size:0};return await Elasticsearch.getSearchResults(e.getClient(),u,!1)}async#t(e,t){let r=new Map;if("Elasticsearch"===e.getName()){let s=await this.#e(e,t);if(!s.indexAbsent)for(let e of s.body.aggregations.eventIds.buckets){let t=e.key,s=new Map;for(let t of e.eventTypes.buckets)s.set(t.key,t.doc_count);s.has("IN")||s.set("IN",0),s.has("OUT")||s.set("OUT",0),r.set(t,s)}return r}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#r(e,{sensorId:t,fromTimestamp:r,toTimestamp:s,tripwireId:o,objectType:a}){const i=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("tripwire")}`;let n=deepcopy(filterTemplate);n.query.bool.must.push({term:{"sensor.id.keyword":t}}),n.query.bool.must.push({range:{timestamp:{lte:s}}}),n.query.bool.must.push({range:{end:{gte:r}}}),null!=o&&n.query.bool.must.push({term:{"event.id.keyword":o}}),n.query.bool.must.push({term:{"object.type.keyword":a}}),n.aggs={groupedBuckets:{composite:{size:100,sources:[{eventId:{terms:{field:"event.id.keyword"}}},{objectId:{terms:{field:"id.keyword"}}}]},aggs:{eventTypes:{terms:{field:"event.type.keyword"}}}}};let u=new Array,l={index:i,body:n,size:0};for(;;){let t=await Elasticsearch.getSearchResults(e.getClient(),l,!1);if(t.indexAbsent)break;{u.push(...t.body.aggregations.groupedBuckets.buckets);const e=t.body.aggregations.groupedBuckets.after_key;if(!e)break;l.body.aggs.groupedBuckets.composite.after=e}}return u}async#s(e,t){let r=new Map;if("Elasticsearch"===e.getName()){let s=await this.#r(e,t);for(let e of s){let t=e.key.eventId,s=null;r.has(t)&&(s=r.get(t)),null==s&&(s=new Map([["IN",0],["OUT",0]]));let o=0,a=0;for(let t of e.eventTypes.buckets)"IN"===t.key?o=t.doc_count:"OUT"===t.key&&(a=t.doc_count);let i=o-a;if(i>0){let t=s.get("IN");s.set("IN",t+1),i>1&&logger.warn(`[DATA] object id: ${e.key.objectId} has IN count greater than OUT count by ${i}.`)}else if(i<0){let t=s.get("OUT");s.set("OUT",t+1),i<-1&&logger.warn(`[DATA] object id: ${e.key.objectId} has OUT count greater than IN count by ${Math.abs(i)}.`)}r.set(t,s)}return r}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}
/** 
     * returns an object containing effective and actual tripwire counts.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {?string} [input.tripwireId=null]
     * @param {string} input.fromTimestamp
     * @param {string} input.toTimestamp
     * @param {string} [input.objectType="Person"]
     * @returns {Promise<Object>} Effective and actual tripwire counts are returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", fromTimestamp: "2023-01-12T11:20:10.000Z", toTimestamp: "2023-01-12T14:20:10.000Z"};
     * let tripwireMetricObject = new mdx.Metrics.TripwireEvent();
     * let tripwireCounts = await tripwireMetricObject.getTripwireCounts(elastic,input);
     */async getTripwireCounts(e,t){let r=Validator.validateJsonSchema(t,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},fromTimestamp:{type:"string"},toTimestamp:{type:"string"},tripwireId:{type:["string","null"],default:null,minLength:1,errorMessage:{minLength:"tripwireId should have atleast 1 character."}},objectType:{type:"string",minLength:1,default:"Person",errorMessage:{minLength:"objectType should have atleast 1 character."}}},required:["sensorId","fromTimestamp","toTimestamp"],errorMessage:{required:"Input should have required properties 'sensorId', 'fromTimestamp' and 'toTimestamp'."}});if(!r.valid)throw new BadRequestError(r.reason);let s=Validator.isValidTimeRange(t.fromTimestamp,t.toTimestamp);if(!s.valid)throw new InvalidInputError(s.reason);let[o,a]=await Promise.all([this.#t(e,t),this.#s(e,t)]),i=new Array,n=new Map,u={events:new Array};for(let[e,t]of o){let r=a.get(e),s=new Array;for(let[e,o]of t){let t=r.get(e);if(s.push({type:e,count:t,actualCount:o}),n.has(e)){let r=n.get(e);n.set(e,{count:r.count+t,actualCount:r.actualCount+o})}else n.set(e,{count:t,actualCount:o})}i.push({id:e,events:s})}for(let[e,t]of n)u.events.push({type:e,count:t.count,actualCount:t.actualCount});return{tripwireKpis:i,aggregatedKpis:u}}async#o(e,{sensorId:t,tripwireId:r,fromTimestamp:s,toTimestamp:o,bucketSizeInSec:a,objectType:i}){const n=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("tripwire")}`;let u=deepcopy(filterTemplate);u.query.bool.must.push({range:{end:{gte:s,lte:o}}}),u.query.bool.must.push({term:{"sensor.id.keyword":t}}),u.query.bool.must.push({term:{"object.type.keyword":i}}),null!=r&&u.query.bool.must.push({term:{"event.id.keyword":r}}),u.aggs={groupedBuckets:{composite:{size:100,sources:[{eventId:{terms:{field:"event.id.keyword"}}},{bucketStartTime:{date_histogram:{field:"end",fixed_interval:`${a}s`}}},{objectId:{terms:{field:"id.keyword"}}}]},aggs:{eventTypes:{terms:{field:"event.type.keyword"}}}}};let l=new Array,m={index:n,body:u,size:0};for(;;){let t=await Elasticsearch.getSearchResults(e.getClient(),m,!1);if(t.indexAbsent)break;{l.push(...t.body.aggregations.groupedBuckets.buckets);const e=t.body.aggregations.groupedBuckets.after_key;if(!e)break;m.body.aggs.groupedBuckets.composite.after=e}}return l}
/** 
     * returns an object containing histogram of actual and effective tripwire counts.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {?string} [input.tripwireId=null]
     * @param {string} [input.fromTimestamp] - Either fromTimestamp and toTimestamp should be present together or minutesAgo should be present.
     * @param {string} [input.toTimestamp] - Either fromTimestamp and toTimestamp should be present together or minutesAgo should be present.
     * @param {number} [input.minutesAgo] - minutesAgo must be an integer. Either fromTimestamp and toTimestamp should be present together or minutesAgo should be present.
     * @param {number} [input.bucketCount=20] - bucketCount must be an integer.
     * @param {string} [input.objectType="Person"]
     * @returns {Promise<Object>} Histogram of actual and effective tripwire counts is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", fromTimestamp: "2023-01-12T11:20:10.000Z", toTimestamp: "2023-01-12T14:20:10.000Z",bucketCount:24};
     * let tripwireMetricObject = new mdx.Metrics.TripwireEvent();
     * let histogramResult = await tripwireMetricObject.getTripwireHistogram(elastic,input);
     */async getTripwireHistogram(e,t){const r={type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},tripwireId:{type:["string","null"],default:null,minLength:1,errorMessage:{minLength:"tripwireId should have atleast 1 character."}},fromTimestamp:{type:["string","null"],default:null},toTimestamp:{type:["string","null"],default:null},minutesAgo:{type:["integer","null"],minimum:1,default:null,errorMessage:{type:"minutesAgo is not an integer.",minimum:"minutesAgo can have a minimum value of 1."}},bucketCount:{type:"integer",minimum:1,default:Histogram.getDefaultHistogramBucketCount(),maximum:Histogram.getMaxHistogramBucketCount(),errorMessage:{type:"bucketCount is not an integer.",minimum:"bucketCount can have a minimum value of 1.",maximum:`bucketCount can have a maximum value of ${Histogram.getMaxHistogramBucketCount()}.`}},objectType:{type:"string",minLength:1,default:"Person",errorMessage:{minLength:"objectType should have atleast 1 character."}}},required:["sensorId"],oneOf:[{required:["minutesAgo"],not:{anyOf:[{required:["fromTimestamp","minutesAgo"]},{required:["toTimestamp","minutesAgo"]}]}},{required:["fromTimestamp","toTimestamp"],not:{required:["minutesAgo"]}}],errorMessage:{required:"Input should have required properties 'sensorId'.",oneOf:"One of the following combinations should be present in input: ('fromTimestamp', 'toTimestamp') or ('minutesAgo')."}};let s=Validator.validateJsonSchema(t,r);if(!s.valid)throw new BadRequestError(s.reason);if(null!=t.fromTimestamp&&null!=t.toTimestamp){let e=Validator.isValidTimeRange(t.fromTimestamp,t.toTimestamp);if(!e.valid)throw new InvalidInputError(e.reason)}else if(null!=t.minutesAgo){if(!Number.isFinite(t.minutesAgo))throw new InvalidInputError("minutesAgo is not a finite integer.");t.toTimestamp=(new Date).toISOString(),t.fromTimestamp=new Date(new Date(t.toTimestamp)-60*t.minutesAgo*1e3).toISOString()}if(!Number.isFinite(t.bucketCount))throw new InvalidInputError("bucketCount is not a finite integer.");t.bucketSizeInSec=Histogram.computeBucketSizeInSec({bucketCount:t.bucketCount,fromTimestamp:t.fromTimestamp,toTimestamp:t.toTimestamp});let o={bucketSizeInSec:t.bucketSizeInSec,tripwires:new Array};if("Elasticsearch"===e.getName()){let r=await this.#o(e,t),s=Histogram.getEmptyHistogram({bucketSizeInSec:t.bucketSizeInSec,fromTimestamp:t.fromTimestamp,toTimestamp:t.toTimestamp}),a=new Map;for(let e of r){let t=e.key.eventId;if(!a.has(t)){let e=new Map;for(let t of s){let r=deepcopy(t);r.countMaps={count:new Map([["IN",0],["OUT",0]]),actualCount:new Map([["IN",0],["OUT",0]])},e.set(new Date(t.start).valueOf(),r)}a.set(t,e)}let r=e.key.bucketStartTime,o=0,i=0;for(let t of e.eventTypes.buckets)"IN"===t.key?o=t.doc_count:"OUT"===t.key&&(i=t.doc_count);let n=a.get(t),u=n.get(r),l=u.countMaps.actualCount.get("IN"),m=u.countMaps.actualCount.get("OUT");u.countMaps.actualCount.set("IN",l+o),u.countMaps.actualCount.set("OUT",m+i);let p=o-i;if(p>0){let t=u.countMaps.count.get("IN");u.countMaps.count.set("IN",t+1),p>1&&logger.warn(`[DATA] id: ${e.key.objectId} has IN count greater than OUT count by ${p}.`)}else if(p<0){let t=u.countMaps.count.get("OUT");u.countMaps.count.set("OUT",t+1),p<-1&&logger.warn(`[DATA] id: ${e.key.objectId} has OUT count greater than IN count by ${Math.abs(p)}.`)}n.set(r,u),a.set(t,n)}for(let[e,r]of a){let s=new Array;for(let[e,t]of r){let e={start:t.start,end:t.end,events:new Array};for(let[r,s]of t.countMaps.count){let o=t.countMaps.actualCount.get(r);e.events.push({type:r,count:s,actualCount:o})}s.push(e)}s.length>0&&(Utils.tsCompare(s[0].start,"<",t.fromTimestamp)&&(s[0].start=t.fromTimestamp),Utils.tsCompare(s[s.length-1].end,">",t.toTimestamp)&&(s[s.length-1].end=t.toTimestamp)),o.tripwires.push({id:e,histogram:s})}return o}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}}module.exports=TripwireEvent;