Utils/Milvus.js

/**Copyright (c) 2009-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.**/
"use strict";const InternalServerError=require("../Errors/InternalServerError"),ServiceUnavailableError=require("../Errors/ServiceUnavailableError"),Database=require("./Database"),{MilvusClient:MilvusClient}=require("@zilliz/milvus2-sdk-node"),CronJob=require("cron").CronJob,grpc=require("@grpc/grpc-js"),moment=require("moment"),Utils=require("./Utils"),Validator=require("../Utils/Validator"),InvalidInputError=require("../Errors/InvalidInputError"),winston=require("winston"),logger=winston.createLogger({transports:[new winston.transports.Console({timestamp:!0,level:"info"})],exitOnError:!1});
/** 
 * Class containing Milvus Utils
 * @memberof mdxWebApiCore.Utils
 * @extends Database
 * */
class Milvus extends Database{
/**
     * Constructor
     * @param {Object} connectionObject
     * @param {Map} configs
     */
constructor(e,t){super({name:"Milvus",client:new MilvusClient(e.url),configs:t})}
/**
     * Checks if status of milvus connection is READY
     * @public
     * @static
     * @async
     * @param {string} url - milvus url
     * @returns {Promise<boolean>} returns if status of milvus connection is READY
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * let isMilvusReady = await mdx.Utils.Milvus.isReady("milvus-url");
     */static async isReady(e){let t=new grpc.Client(e,grpc.credentials.createInsecure()).getChannel(),o=!1;for(;!o;){o=t.getConnectivityState(!0)==grpc.connectivityState.READY,o?logger.info(`milvus connection status .. ${o}.`):(logger.info(`milvus connection status .. ${o}. Retrying in 5 seconds.`),await Utils.sleep(5e3))}return!0}static#e(e){const t=moment.utc(e).format("YYYY-MM-DD"),o=moment(t).startOf("month"),r=o.clone().startOf("week"),n=o.diff(r,"days");return Math.ceil((moment.utc(t).date()+n)/7)}static#t(e,t){let o=new Set,r=moment.utc(t).format("YYYY-MM-DD"),n=moment.utc(e).format("YYYY-MM-DD");for(;Utils.tsCompare(n,"<=",r);)n=moment.utc(n),o.add(`day_${n.date()}_month_${n.month()+1}_year_${n.year()}`),n=n.add(1,"days").format("YYYY-MM-DD");return o}static#o(e,t){let o=new Set,r=moment.utc(t).format("YYYY-MM-DD"),n=moment.utc(e).format("YYYY-MM-DD");for(;Utils.tsCompare(n,"<=",r);)n=moment.utc(n),o.add(`week_${this.#e(n)}_month_${n.month()+1}_year_${n.year()}`),n=n.add(7,"days").format("YYYY-MM-DD");return r=moment.utc(r),o.add(`week_${this.#e(r)}_month_${r.month()+1}_year_${r.year()}`),o}
/**
     * returns a set of partitions that belong to a milvus collection
     * @public
     * @static
     * @async
     * @param {Database} milvusDb
     * @param {Object} [input={}] - Input object.
     * @param {boolean} [input.excludeDefault=true] - If set to true, _default partition will be excluded from the result
     * @returns {Promise<Set<string>>} A set of partitions that belong to a milvus collection is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const milvus = new mdx.Utils.Milvus({url: "milvus-url"},databaseConfigMap);
     * let partitionsOfCollection = await mdx.Utils.Milvus.getPartitionsOfCollection(milvus);
     */static async getPartitionsOfCollection(e,t={}){let o=Validator.validateJsonSchema(t,{type:"object",additionalProperties:{not:!0,errorMessage:"Invalid additional Input ${0#}."},properties:{excludeDefault:{type:"boolean",default:!0,errorMessage:{type:"'excludeDefault' should be of boolean type."}}}},!1);if(!o.valid)throw new InvalidInputError(o.reason);const r=e.getClient(),n=e.getConfigs();let a=await r.partitionManager.showPartitions({collection_name:n.get("collectionName")});if("Success"===a.status.error_code){let e=new Set(a.partition_names);return t.excludeDefault&&e.delete("_default"),e}throw new InternalServerError(`Error Code: ${a.status.error_code}. Reason: ${a.status.reason}.`)}
/**
     * returns a set of milvus partitions that may be present for a given time range based on a partitioning strategy.
     * @public
     * @static
     * @param {Database} milvusDb
     * @param {string} fromTimestamp
     * @param {string} toTimestamp
     * @returns {Set<string>} A set of partitions that may be present in milvus for a given time range is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const milvus = new mdx.Utils.Milvus({url: "milvus-url"},databaseConfigMap);
     * let partitionsInATimeRange = mdx.Utils.Milvus.getPartitionsFromTimestamps(milvus,"2023-01-12T11:20:10.000Z","2023-01-12T14:20:10.000Z");
     */static getPartitionsFromTimestamps(e,t,o){switch(e.getConfigs().get("partitioningStrategy")){case"day":return this.#t(t,o);case"week":return this.#o(t,o);default:throw new InternalServerError("Invalid partitioningStrategy")}}
/**
     * returns milvus search results
     * @public
     * @static
     * @async
     * @param {Database} milvusDb
     * @param {Object} queryObject
     * @returns {Array<Object>} - An array containing similar behaviors (search result) is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const milvus = new mdx.Utils.Milvus({url: "milvus-url"},databaseConfigMap);
     * let results = await mdx.Utils.Milvus.getSearchResults(milvus,queryObject);
     */static async getSearchResults(e,t){const o=e.getClient(),r=e.getConfigs();let n=new Date,a=!1,l=null,i=!1,s=!1;for(;!a;){let e=await o.collectionManager.showCollections({collection_name:t.collection_name,type:1});if("Success"!==e.status.error_code)throw new InternalServerError(`Error Code: ${c.status.error_code}. Reason: ${c.status.reason}.`);if(0==e.data.length)l=null;else for(let o of e.data)if(o.name===t.collection_name){s=!0,l=o.loadedPercentage,"100"===l&&(a=!0);break}if(!a){if(new Date-n>1e3*r.get("searchQueryRetryPeriodInSec"))throw null==l?new InternalServerError(`Milvus Error: Collection '${t.collection_name}' could not be loaded.`):new ServiceUnavailableError(`Milvus Error: Collection '${t.collection_name}' has loaded ${l}%. Try again later.`);if(null==l){if(i||s)throw new InternalServerError(`Milvus Error: Collection '${t.collection_name}' could not be loaded.`);await o.collectionManager.loadCollection({collection_name:t.collection_name}),logger.info(`Started loading '${t.collection_name}' collection.`),i=!0,n=new Date}}}let c=await o.dataManager.search(t);if("Success"===c.status.error_code)return c.results;throw new InternalServerError(`Error Code: ${c.status.error_code}. Reason: ${c.status.reason}.`)}static#r(e,t){const o=e.get("partitioningStrategy"),r=e.get("partitionRetentionInDays");let n=new Set;switch(o){case"day":{if(t.size<=r)return n;let e=t.size-r,o=new Map;for(let e of t)o.set(new Date(moment.utc(e,"[day_]DD[_month_]MM[_year_]YYYY")).getTime(),e);let a=Array.from(o.keys()).sort();for(let t=0;t<e;t++)n.add(o.get(a[t]));return n}case"week":{const e=Math.ceil(r/7);if(t.size<=e)return n;let o=t.size-e,a=new Map;for(let e of t){let t=new Date(moment.utc(e,"[week_]DD[_month_]MM[_year_]YYYY").format("YYYY-MM-01")).getTime(),o=parseInt(e.split("_")[1],10),r=e.split("_").slice(2).join("_");if(a.has(t)){let e=a.get(t);e.weekNumbers.add(o),a.set(t,e)}else a.set(t,{weekNumbers:new Set([o]),basePartitionName:r})}let l=Array.from(a.keys()).sort();for(let e of l){let t=a.get(e);if(1==t.weekNumbers.size){if(n.add(`week_${Array.from(t.weekNumbers)[0]}_${t.basePartitionName}`),n.size==o)break}else{let e=Array.from(t.weekNumbers).sort(),r=!1;for(let a of e)if(n.add(`week_${a}_${t.basePartitionName}`),n.size==o){r=!0;break}if(r)break}}return n}default:throw new InternalServerError("Invalid partitioningStrategy")}}
/**
     * Bootstraps milvus collection. Waits for collection to be created and loads the collection (if its currently unloaded).
     * @public
     * @static
     * @async
     * @param {Database} milvusDb
     * @returns {Promise<Object>} A success message is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const milvus = new mdx.Utils.Milvus({url: "milvus-url"},databaseConfigMap);
     * let result = await mdx.Utils.Milvus.bootstrapCollection(milvus);
     */static async bootstrapCollection(e){const t=e.getClient(),o=e.getConfigs();let r=!1;for(;!r;){let e=await t.collectionManager.hasCollection({collection_name:o.get("collectionName")});if("Success"!==e.status.error_code)throw InternalServerError(`Error Code: ${searchResults.status.error_code}. Reason: ${searchResults.status.reason}.`);r=e.value,r||(logger.warn(`Collection: ${o.get("collectionName")} not present. Waiting for it to be created.`),await Utils.sleep(2e3))}logger.info(`Collection: ${o.get("collectionName")} is present.`);let n=await t.collectionManager.showCollections({collection_name:o.get("collectionName"),type:1});if("Success"!==n.status.error_code)throw new InternalServerError(`Error Code: ${searchResults.status.error_code}. Reason: ${searchResults.status.reason}.`);let a=null;if(0!=n.data.length)for(let e of n.data)if(e.name===o.get("collectionName")){a=e.loadedPercentage;break}return null==a?(await t.collectionManager.loadCollection({collection_name:o.get("collectionName")}),logger.info(`Started loading '${o.get("collectionName")}' collection.`)):"100"===a?logger.info(`Collection '${o.get("collectionName")}' has already been loaded.`):logger.info(`Collection load is in progress for '${o.get("collectionName")}'. Current load percentage is ${a}%.`),logger.info("[MILVUS BOOTSTRAP INFO] Milvus collection bootstrap successfully completed."),{success:!0}}
/**
     * returns a cron job that runs at midnight (machine time). The cron job manages the partition retention of milvus collection.
     * @public
     * @static
     * @param {Database} milvusDb
     * @returns {CronJob} The cron job is returned
     * @example
     * const mdx = require("@nvidia-mdx/web-api-core");
     * const milvus = new mdx.Utils.Milvus({url: "milvus-url"},databaseConfigMap);
     * let milvusCronJob = mdx.Utils.Milvus.milvusCronJob(milvus);
     */static milvusCronJob(e){return new CronJob("00 00 00 * * *",(async function(){logger.info("[CRON INFO] Starting Cron Job.");const t=e.getClient(),o=e.getConfigs();try{let r=await Milvus.getPartitionsOfCollection(e),n=Milvus.#r(o,r);if(n.size>0){await t.collectionManager.releaseCollection({collection_name:o.get("collectionName")}),logger.info(`Released collection '${o.get("collectionName")}' before partition drop.`);let e=new Array;for(let e of n)t.partitionManager.dropPartition({collection_name:o.get("collectionName"),partition_name:e});await Promise.all(e),logger.info(`Partitions: ${Array.from(n).join(", ")} of collection: ${o.get("collectionName")} have been dropped.`),await t.collectionManager.loadCollection({collection_name:o.get("collectionName")}),logger.info(`Started loading '${o.get("collectionName")}' collection.`)}else logger.info(`No partitions have to be dropped for collection: ${o.get("collectionName")}.`);logger.info("[CRON INFO] Cron Job successfully completed.")}catch(e){logger.error(`[CRON ERROR] ${e.toString()}`)}}))}}module.exports=Milvus;