Services/Clustering.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const deepcopy=require("deepcopy"),Database=require("../Utils/Database"),filterTemplate=require("../queryTemplates/filter.json"),randomlySampledClustersTemplate=require("../queryTemplates/randomlySampledClusters.json"),Elasticsearch=require("../Utils/Elasticsearch"),Validator=require("../Utils/Validator"),InternalServerError=require("../Errors/InternalServerError"),InvalidInputError=require("../Errors/InvalidInputError"),BadRequestError=require("../Errors/BadRequestError");
/** 
 * Class which defines Clustering
 * @memberof mdxWebApiCore.Services
 * */
class Clustering{static#e=30;static#r=100;async#s(e,{sensorId:r,fromTimestamp:s,toTimestamp:t}){const a=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("behavior")}`;let n=deepcopy(filterTemplate);n.query.bool.must.push({range:{timestamp:{lte:t}}}),n.query.bool.must.push({range:{end:{gte:s}}}),n.query.bool.must.push({term:{"sensor.id.keyword":r}}),n.query.bool.must.push({exists:{field:"info.cluster.index"}}),n.query.bool.must_not=[{term:{"info.cluster.index.keyword":"-1"}}];let i={index:a,body:n,sort:"end:desc",size:1,_sourceIncludes:["info.cluster.modelVersion"]};return await Elasticsearch.getSearchResults(e.getClient(),i,!1)}async#t(e,r){let s=null;if("Elasticsearch"===e.getName()){let t=await this.#s(e,r);return t.indexAbsent||(t=Elasticsearch.searchResultFormatter(t.body),t.length>0&&(s=t[0].info["cluster.modelVersion"])),s}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#a(e,{sensorId:r,modelVersion:s,clusterIndex:t}){const a=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("clusterLabels")}`;let n=deepcopy(filterTemplate);n.query.bool.must.push({term:{"sensorId.keyword":r}}),n.query.bool.must.push({term:{"modelVersion.keyword":s}}),null!=t&&n.query.bool.must.push({term:{"clusterIndex.keyword":t}}),n.aggs={clusterIndices:{terms:{field:"clusterIndex.keyword",size:1e3},aggs:{label:{top_hits:{size:1,sort:[{timestamp:{order:"desc"}}],_source:{includes:["label"]}}}}}};let i={index:a,body:n,size:0};return await Elasticsearch.getSearchResults(e.getClient(),i,!1)}
/** 
     * returns a map containing clusterIndex and the latest cluster label associated to it.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {string} input.modelVersion
     * @param {?string} [input.clusterIndex=null]
     * @returns {Promise<Map<string,string>>} A map containing clusterIndex and the latest cluster label associated to it is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", modelVersion: "2"};
     * let clusteringObject = new mdx.Services.Clustering();
     * let labelMap = await clusteringObject.getClusterLabels(elastic,input);
     */async getClusterLabels(e,r){let s=Validator.validateJsonSchema(r,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},modelVersion:{type:"string",minLength:1,errorMessage:{minLength:"modelVersion should have atleast 1 character."}},clusterIndex:{type:["string","null"],default:null,minLength:1,errorMessage:{minLength:"clusterIndex should have atleast 1 character."}}},required:["sensorId","modelVersion"],errorMessage:{required:"Input should have required properties 'sensorId' and 'modelVersion'."}},!1);if(!s.valid)throw new BadRequestError(s.reason);let t=new Map;if("Elasticsearch"===e.getName()){let s=await this.#a(e,r);if(!s.indexAbsent)for(let e of s.body.aggregations.clusterIndices.buckets){let r=e.key,s=Elasticsearch.searchResultFormatter(e.label);s=s[0].label,t.set(r,s)}return t}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#n(e,{sensorId:r,fromTimestamp:s,toTimestamp:t,modelVersion:a,maxClusterSampleSize:n,clusterIndex:i,minBehaviorDistance:o}){const l=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("behavior")}`;let d=deepcopy(randomlySampledClustersTemplate);d.query.function_score.query.bool.must.push({exists:{field:"info.cluster.index"}}),d.query.function_score.query.bool.must.push({term:{"sensor.id.keyword":r}}),d.query.function_score.query.bool.must.push({range:{timestamp:{lte:t}}}),d.query.function_score.query.bool.must.push({range:{end:{gte:s}}}),d.query.function_score.query.bool.must.push({term:{"info.cluster.modelVersion.keyword":a}}),d.query.function_score.query.bool.must.push({range:{distance:{gte:o}}}),null!=i?d.query.function_score.query.bool.must.push({term:{"info.cluster.index.keyword":i}}):d.query.function_score.query.bool.must_not=[{term:{"info.cluster.index.keyword":"-1"}}],d.aggs.clusterIndices.aggs.randomBehaviorSamples.top_hits.size=n,d.aggs.clusterIndices.aggs.randomBehaviorSamples.top_hits._source.includes=["Id","id","timestamp","end","locations","smoothLocations","timeInterval","length","speedOverTime","sensor","place.name","object","edges","info.cluster.index","info.cluster.modelVersion","direction","speed","distance","bearing","videoPath"];let u={index:l,body:d,size:0};return await Elasticsearch.getSearchResults(e.getClient(),u,!1)}
/** 
     * returns sampled behaviors for each cluster.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {string} input.fromTimestamp
     * @param {string} input.toTimestamp
     * @param {?string} [input.clusterIndex=null]
     * @param {number} [input.maxClusterSampleSize=100] - maxClusterSampleSize must be an integer.
     * @param {number} [input.minBehaviorDistance=30]
     * @returns {Promise<Object>} An object containing sampled behaviors for each cluster is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", fromTimestamp: "2023-01-12T11:20:10.000Z", toTimestamp: "2023-01-12T14:20:10.000Z"};
     * let clusteringObject = new mdx.Services.Clustering();
     * let clusters = await clusteringObject.getSampledBehaviorClusters(elastic,input);
     */async getSampledBehaviorClusters(e,r){const s={type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},fromTimestamp:{type:"string"},toTimestamp:{type:"string"},clusterIndex:{type:["string","null"],default:null,minLength:1,errorMessage:{minLength:"clusterIndex should have atleast 1 character."}},maxClusterSampleSize:{type:"integer",minimum:1,maximum:100,default:Clustering.#r,errorMessage:{minimum:"maxClusterSampleSize can have a minimum value of 1.",maximum:"maxClusterSampleSize can have a maximum value of 100."}},minBehaviorDistance:{type:"number",minimum:0,default:Clustering.#e,errorMessage:{minimum:"minBehaviorDistance can have a minimum value of 0."}}},required:["sensorId","fromTimestamp","toTimestamp"],errorMessage:{required:"Input should have required properties 'sensorId', 'fromTimestamp' and 'toTimestamp'."}};let t=Validator.validateJsonSchema(r,s);if(!t.valid)throw new BadRequestError(t.reason);let a=Validator.isValidTimeRange(r.fromTimestamp,r.toTimestamp);if(!a.valid)throw new InvalidInputError(a.reason);if("maxClusterSampleSize"in r&&!Number.isFinite(r.maxClusterSampleSize))throw new InvalidInputError("maxClusterSampleSize is not a finite integer.");if("minBehaviorDistance"in r&&!Number.isFinite(r.minBehaviorDistance))throw new InvalidInputError("minBehaviorDistance is not a finite number.");let n=new Array,i=await this.#t(e,r);if(null==i)return{clusters:n};r.modelVersion=i;let o=await this.getClusterLabels(e,{sensorId:r.sensorId,modelVersion:r.modelVersion,clusterIndex:r.clusterIndex});if("Elasticsearch"===e.getName()){let s=await this.#n(e,r);if(!s.indexAbsent)for(let e of s.body.aggregations.clusterIndices.buckets){let r=e.key,s=e.doc_count,t=Elasticsearch.searchResultFormatter(e.randomBehaviorSamples),a=o.has(r)?o.get(r):null;n.push({clusterIndex:r,modelVersion:i,label:a,count:s,sampledBehaviors:t})}return{clusters:n}}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}async#i(e,{sensorId:r,modelVersion:s,clusterIndex:t}){const a=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("behavior")}`;let n=deepcopy(filterTemplate);n.query.bool.must.push({exists:{field:"info.cluster.index"}}),n.query.bool.must.push({term:{"sensor.id.keyword":r}}),n.query.bool.must.push({term:{"info.cluster.modelVersion.keyword":s}}),n.query.bool.must.push({term:{"info.cluster.index.keyword":t}});let i={index:a,body:n,size:1,_sourceIncludes:["sensor.id","info.cluster.modelVersion","info.cluster.index"]};return await Elasticsearch.getSearchResults(e.getClient(),i,!1)}
/** 
     * returns if a cluster is valid.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {string} input.modelVersion
     * @param {string} input.clusterIndex
     * @returns {Promise<boolean>} Validity of the cluster is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", modelVersion: "2", clusterIndex: "1"};
     * let clusteringObject = new mdx.Services.Clustering();
     * let validCluster = await clusteringObject.isValidCluster(elastic,input);
     */async isValidCluster(e,r){let s=Validator.validateJsonSchema(r,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},modelVersion:{type:"string",minLength:1,errorMessage:{minLength:"modelVersion should have atleast 1 character."}},clusterIndex:{type:"string",minLength:1,errorMessage:{minLength:"clusterIndex should have atleast 1 character."}}},required:["sensorId","modelVersion","clusterIndex"],errorMessage:{required:"Input should have required properties 'sensorId', 'modelVersion' and 'clusterIndex'."}},!1);if(!s.valid)throw new BadRequestError(s.reason);let t=!1;if("Elasticsearch"===e.getName()){let s=await this.#i(e,r);return s.indexAbsent||(s=Elasticsearch.searchResultFormatter(s.body),s.length>0&&(t=!0)),t}throw new InternalServerError(`Invalid database: ${e.getName()}.`)}
/** 
     * returns if a cluster label already exists.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {string} input.modelVersion
     * @param {string} input.label
     * @returns {Promise<boolean>} A boolean is returned which signifies whether the cluster label already exists
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", modelVersion: "2", label: "left"};
     * let clusteringObject = new mdx.Services.Clustering();
     * let clusterLabelExists = await clusteringObject.doesClusterLabelExist(elastic,input);
     */async doesClusterLabelExist(e,r){let s=Validator.validateJsonSchema(r,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},modelVersion:{type:"string",minLength:1,errorMessage:{minLength:"modelVersion should have atleast 1 character."}},label:{type:"string",minLength:1,errorMessage:{minLength:"label should have atleast 1 character."}}},required:["sensorId","modelVersion","label"],errorMessage:{required:"Input should have required properties 'sensorId', 'modelVersion' and 'label'."}},!1);if(!s.valid)throw new BadRequestError(s.reason);let t=r.label.trim().replace(/\s\s+/g," ");t=t.replace(/\s/g,"-").toLowerCase();let a=!1,n=await this.getClusterLabels(e,{sensorId:r.sensorId,modelVersion:r.modelVersion});for(const[e,r]of n)if(r===t){a=!0;break}return a}async#o(e,{sensorId:r,modelVersion:s,clusterIndex:t,label:a}){const n=`${e.getConfigs().get("indexPrefix")}${Elasticsearch.getIndex("clusterLabels")}`,i=e.getClient(),o="timestamp",l="addInsertTimestamp-field-timestamp";(await i.ingest.getPipeline()).body.hasOwnProperty(l)||await Elasticsearch.initTimestampIngestPipeline(i,o);let d={index:n,body:{sensorId:r,modelVersion:s,clusterIndex:t,label:a},pipeline:l};return await i.index(d)}
/** 
     * returns a success message if the cluster label was inserted successfully.
     * @public
     * @async
     * @param {Database} documentDb - Database Object
     * @param {Object} input - Input object.
     * @param {string} input.sensorId
     * @param {string} input.modelVersion
     * @param {string} input.clusterIndex
     * @param {string} input.label
     * @returns {Promise<Object>} A success message is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const elastic = new mdx.Utils.Elasticsearch({node: "elasticsearch-url"},databaseConfigMap);
     * let input = {sensorId: "abc", modelVersion: "2", clusterIndex: "1", label: "left"};
     * let clusteringObject = new mdx.Services.Clustering();
     * let result = await clusteringObject.addClusterLabel(elastic,input);
     */async addClusterLabel(e,r){let s=Validator.validateJsonSchema(r,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{sensorId:{type:"string",minLength:1,errorMessage:{minLength:"sensorId should have atleast 1 character."}},modelVersion:{type:"string",minLength:1,errorMessage:{minLength:"modelVersion should have atleast 1 character."}},clusterIndex:{type:"string",minLength:1,errorMessage:{minLength:"clusterIndex should have atleast 1 character."}},label:{type:"string",minLength:1,errorMessage:{minLength:"label should have atleast 1 character."}}},required:["sensorId","modelVersion","clusterIndex","label"],errorMessage:{required:"Input should have required properties 'sensorId', 'modelVersion', 'clusterIndex' and 'label'."}},!1);if(!s.valid)throw new BadRequestError(s.reason);if("-1"===r.clusterIndex)throw new InvalidInputError("Cannot assign label to clusterIndex -1.");r.label=r.label.trim().replace(/\s\s+/g," "),r.label=r.label.replace(/\s/g,"-").toLowerCase();let[t,a]=await Promise.all([this.isValidCluster(e,{sensorId:r.sensorId,modelVersion:r.modelVersion,clusterIndex:r.clusterIndex}),this.doesClusterLabelExist(e,{sensorId:r.sensorId,modelVersion:r.modelVersion,label:r.label})]);if(!t){let e=`clusterIndex: ${r.clusterIndex} belonging to model version: ${r.modelVersion} of sensorId: ${r.sensorId} doesn't exist.`;throw new InvalidInputError(e)}if(a){let e=`modelVersion: ${r.modelVersion} of sensorId: ${r.sensorId} already has label: ${r.label}.`;throw new InvalidInputError(e)}if("Elasticsearch"===e.getName())return await this.#o(e,r),{success:!0};throw new InternalServerError(`Invalid database: ${e.getName()}.`)}}module.exports=Clustering;