(Latest Version)

Digital Fingerprinting (DFP) Reference

dfp_deployment_configs.png

Pipeline Structure Configuration

dfp_pipeline_structure.png

The stages in both the Training and Inference pipelines can be mixed and matched with little impact​, that is, the MultiFileSource can be configured to pull from S3 or from local files and can be replaced altogether with any other Morpheus input stage. Similarly, the S3 writer can be replaced with any Morpheus output stage. Regardless of the inputs and outputs the core pipeline should remain unchanged. While stages in the core of the pipeline (inside the blue areas in the above diagram) perform common actions that should be configured not exchanged.

Morpheus Config

For both inference and training pipeline the Morpheus config object should be constructed with the same values, for example:

Copy
Copied!
            

import os from morpheus.config import Config from morpheus.config import ConfigAutoEncoder from morpheus.config import CppConfig from morpheus.cli.utils import get_package_relative_file from morpheus.cli.utils import load_labels_file

Copy
Copied!
            

CppConfig.set_should_use_cpp(False) config = Config() config.num_threads = os.cpu_count() config.ae = ConfigAutoEncoder() config.ae.feature_columns = load_labels_file(get_package_relative_file("data/columns_ae_azure.txt"))

Other attributes which might be needed:

Attribute

Type

Default

Description

Config.ae.userid_column_name

str

userIdentityaccountId

Column in the DataFrame containing the username or user ID

Config.ae.timestamp_column_name

str

timestamp

Column in the DataFrame containing the timestamp of the event

Config.ae.fallback_username

str

generic_user

Name to use for the generic user model, should not match the name of any real users

Schema Definition

DataFrame Input Schema (DataFrameInputSchema)

The DataFrameInputSchema (morpheus/utils/column_info.py) class defines the schema specifying the columns to be included in the output DataFrame. Within the DFP pipeline there are two stages where pre-processing is performed, the DFPFileToDataFrameStage stage and the DFPPreprocessingStage. This decoupling of the pre-processing stages from the actual operations needed to be performed allows for the actual schema to be user-defined in the pipeline and re-usability of the stages. It is up to the user to define the fields which will appear in the DataFrame. Any column in the input data that isn’t specified in either column_info or preserve_columns constructor arguments will not appear in the output. The exception to this are JSON fields, specified in the json_columns argument which defines json fields which are to be normalized.

It is important to note that the fields defined in json_columns are normalized prior to the processing of the fields in column_info, allowing for processing to be performed on fields nested in JSON columns. For example, say we had a JSON field named event containing a key named timestamp, which in the JSON data appears as an ISO 8601 formatted date string, we could ensure it was converted to a datetime object to downstream stages with the following:

Copy
Copied!
            

from morpheus.utils.column_info import DataFrameInputSchema from morpheus.utils.column_info import DateTimeColumn

Copy
Copied!
            

schema = DataFrameInputSchema( json_columns=['event'], column_info=[DateTimeColumn(name=config.ae.timestamp_column_name, dtype=datetime, input_name='event.timestamp')])

In the above examples, three operations were performed:

  1. The event JSON field was normalized, resulting in new fields prefixed with event. to be included in the output DataFrame.

  2. The newly created field event.timestamp is parsed into a datetime field.

  3. Since the DFP pipeline explicitly requires a timestamp field, we name this new column with the config.ae.timestamp_column_name config attribute ensuring it matches the pipeline configuration. When name and input_name are the same the old field is overwritten, and when they differ a new field is created.

The DFPFileToDataFrameStage is executed first and is responsible for flattening potentially nested JSON data and performing any sort of data type conversions. The DFPPreprocessingStage is executed later after the DFPSplitUsersStage allowing for the possibility of per-user computed fields such as the logcount and locincrement fields mentioned previously. Both stages are performed after the DFPFileBatcherStage allowing for per time period (per-day by default) computed fields.

Argument

Type

Description

json_columns

List[str]

Optional list of json columns in the incoming DataFrame to be normalized (currently using the pandas.json_normalize method). Each key in a json field will be flattened into a new column named <field>.<key> for example a json field named user containing a key named id will result in a new column named user.id. By default, this is an empty list.

column_info

List[str]

Optional list of ColumnInfo instances, each defining a specific operation to be performed upon a column. These include renames, type conversions, and custom computations. By default, this is an empty list.

preserve_columns

List[str] or str

Optional regular expression string or list of regular expression strings that define columns in the input data which should be preserved in the output DataFrame. By default, this is an empty list.

row_filter

function or None

Optional function to be called after all other processing has been performed. This function receives the DataFrame as its only argument returning a DataFrame.

Column Info (ColumnInfo)

Defines a single column and type-cast.

Argument

Type

Description

name

str

Name of the column

dtype

str or Python type

Any type string or Python class recognized by Pandas

Custom Column (CustomColumn)

Subclass of ColumnInfo, defines a column to be computed by a user-defined function process_column_fn.

Argument

Type

Description

name

str

Name of the column

dtype

str or Python type

Any type string or Python class recognized by Pandas

process_column_fn

function

Function which receives the entire DataFrame as its only input, returning a new pandas.Series object to be stored in column name.

Rename Column (RenameColumn)

Subclass of ColumnInfo, adds the ability to also perform a rename.

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Any type string or Python class recognized by Pandas

input_name

str

Original column name

Boolean Column (BoolColumn)

Subclass of RenameColumn, adds the ability to map a set custom values as boolean values. For example say we had a string input field containing one of 5 possible enum values: OK, SUCCESS, DENIED, CANCELED and EXPIRED we could map these values into a single boolean field as:

Copy
Copied!
            

from morpheus.utils.column_info import BoolColumn

Copy
Copied!
            

field = BoolColumn(name="result", dtype=bool, input_name="result", true_values=["OK", "SUCCESS"], false_values=["DENIED", "CANCELED", "EXPIRED"])

We used strings in this example; however, we also could have just as easily mapped integer status codes. We also have the ability to map onto types other than boolean by providing custom values for true and false (for example, 1/0, yes/no) .

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Typically this should be bool ; however, it could potentially be another type if true_value and false_value are specified.

input_name

str

Original column name

true_value

Any

Optional value to store for true values, should be of a type dtype. Defaults to True.

false_value

Any

Optional value to store for false values, should be of a type dtype. Defaults to False.

true_values

List[str]

List of string values to be interpreted as true.

false_values

List[str]

List of string values to be interpreted as false.

Date-Time Column (DateTimeColumn)

Subclass of RenameColumn, specific to casting UTC localized datetime values. When incoming values contain a time-zone offset string the values are converted to UTC, while values without a time-zone are assumed to be UTC.

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Any type string or Python class recognized by Pandas

input_name

str

Original column name

String-Join Column (StringJoinColumn)

Subclass of RenameColumn, converts incoming list values to string by joining by sep.

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Any type string or Python class recognized by Pandas

input_name

str

Original column name

sep

str

Separator string to use for the join

String-Cat Column (StringCatColumn)

Subclass of ColumnInfo, concatenates values from multiple columns into a new string column separated by sep.

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Any type string or Python class recognized by Pandas

input_columns

List[str]

List of columns to concatenate

sep

str

Separator string

Increment Column (IncrementColumn)

Subclass of DateTimeColumn, counts the unique occurrences of a value in groupby_column over a specific time window period based on dates in the input_name field.

Argument

Type

Description

name

str

Name of the destination column

dtype

str or Python type

Should be int or other integer class

input_name

str

Original column name containing timestamp values

groupby_column

str

Column name to group by

period

str

Optional time period to perform the calculation over, value must be one of pandas’ offset strings. Defaults to D one day

Input Stages

dfp_input_config.png

Source Stage (MultiFileSource)

The MultiFileSource(examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py) receives a path or list of paths (filenames), and will collectively be emitted into the pipeline as an fsspec.core.OpenFiles object. The paths may include wildcards * as well as URLs (ex: s3://path) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by fsspec. In addition to this paths can be cached locally by prefixing them with filecache:: (ex: filecache::s3://bucket-name/key-name).

Note: This stage does not actually download the data files, allowing the file list to be filtered and batched prior to being downloaded.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

filenames

List[str] or str

Paths to source file to be read from

File Batcher Stage (DFPFileBatcherStage)

The DFPFileBatcherStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py) groups data in the incoming DataFrame in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, AUTH_LOG-2022-08-21T22.05.23Z.json), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied date_conversion_func function (more on this later).

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

date_conversion_func

function

Function receives a single fsspec.core.OpenFile argument and returns a datetime.datetime object

period

str

Time period to group data by, value must be one of pandas’ offset strings

sampling_rate_s

int

Optional, default=0. When non-zero a subset of the incoming data files will be sampled, only including a file if the datetime returned by date_conversion_func is at least sampling_rate_s seconds greater than the datetime of the previously included file

start_time

datetime

Optional, default=None. When not None incoming data files will be filtered, excluding any files created prior to start_time

end_time

datetime

Optional, default=None. When not None incoming data files will be filtered, excluding any files created after end_time

For situations where the creation date of the log file is encoded in the filename, the date_extractor in the morpheus/utils/file_utils.py module can be used. The date_extractor assumes that the timestamps are localized to UTC and will need to have a regex pattern bound to it before being passed in as a parameter to DFPFileBatcherStage. The regex pattern will need to contain the following named groups: year, month, day, hour, minute, second, and optionally microsecond. In cases where the regular expression does not match the date_extractor function will fallback to using the modified time of the file.

For input files containing an ISO 8601 formatted date string the iso_date_regex regex can be used ex:

Copy
Copied!
            

from functools import partial from morpheus.utils.file_utils import date_extractor from dfp.utils.regex_utils import iso_date_regex

Copy
Copied!
            

# Batch files into buckets by time. Use the default ISO date extractor from the filename pipeline.add_stage( DFPFileBatcherStage(config, period="D", date_conversion_func=functools.partial(date_extractor, filename_regex=iso_date_regex)))

Note: If date_conversion_func returns time-zone aware timestamps, then start_time and end_time if not-None need to also be timezone aware datetime objects.

File to DataFrame Stage (DFPFileToDataFrameStage)

The DFPFileToDataFrameStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py) stage receives a list of an fsspec.core.OpenFiles and loads them into a single DataFrame which is then emitted into the pipeline. When the parent stage is DFPFileBatcherStage each batch (typically one day) is concatenated into a single DataFrame. If the parent was MultiFileSource the entire dataset is loaded into a single DataFrame. Because of this, it is important to choose a period argument for DFPFileBatcherStage small enough such that each batch can fit into memory.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

schema

DataFrameInputSchema

Schema specifying columns to load, along with any necessary renames and data type conversions

filter_null

bool

Optional: Whether to filter null rows after loading, by default True.

file_type

morpheus.common.FileTypes (enum)

Optional: Indicates file type to be loaded. Currently supported values at time of writing are: FileTypes.Auto, FileTypes.CSV, FileTypes.JSON and FileTypes.PARQUET. Default value is FileTypes.Auto which will infer the type based on the file extension, set this value if using a custom extension

parser_kwargs

dict or None

Optional: additional keyword arguments to be passed into the DataFrame parser, currently this is going to be either pandas.read_csv, pandas.read_json or pandas.read_parquet

cache_dir

str

Optional: path to cache location, defaults to ./.cache/dfp

This stage is able to download and load data files concurrently by multiple methods. Currently supported methods are: single_thread, multiprocess, dask, and dask_thread. The method used is chosen by setting the FILE_DOWNLOAD_TYPE environment variable, and dask_thread is used by default, and single_thread effectively disables concurrent loading.

This stage will cache the resulting DataFrame in cache_dir, since we are caching the DataFrames and not the source files, a cache hit avoids the cost of parsing the incoming data. In the case of remote storage systems, such as S3, this avoids both parsing and a download on a cache hit. One consequence of this is that any change to the schema will require purging cached files in the cache_dir before those changes are visible.

Note: This caching is in addition to any caching which may have occurred when using the optional filecache:: prefix.

Output Stages

dfp_output_config.png

For the inference pipeline, any Morpheus output stage, such as morpheus.stages.output.write_to_file_stage.WriteToFileStage and morpheus.stages.output.write_to_kafka_stage.WriteToKafkaStage, could be used in addition to the WriteToS3Stage documented below.

Write to File Stage (WriteToFileStage)

This final stage will write all received messages to a single output file in either CSV or JSON format.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

filename

str

The file to write anomalous log messages to.

overwrite

bool

Optional, defaults to False. If the file specified in filename already exists, it will be overwritten if this option is set to True

Write to S3 Stage (WriteToS3Stage)

The WriteToS3Stage (examples/digital_fingerprinting/production/morpheus/dfp/stages/write_to_s3_stage.py) stage writes the resulting anomaly detections to S3. The WriteToS3Stage decouples the S3 specific operations from the Morpheus stage, and as such receives an s3_writer argument.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

s3_writer

function

User defined function which receives an instance of a morpheus.messages.message_meta.MessageMeta and returns that same message instance. Any S3 specific configurations, such as bucket name, should be bound to the method.

Core Pipeline

These stages are common to both the training and inference pipelines, unlike the input and output stages these are specific to the DFP pipeline and intended to be configured but not replaceable.

Split Users Stage (DFPSplitUsersStage)

The DFPSplitUsersStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_split_users_stage.py) stage receives an incoming DataFrame and emits a list of DFPMessageMeta where each DFPMessageMeta represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

include_generic

bool

When True a DFPMessageMeta will be constructed for the generic user containing all records not excluded by the skip_users and only_users filters

include_individual

bool

When True a DFPMessageMeta instance will be constructed for each user not excluded by the skip_users and only_users filters

skip_users

List[str] or None

List of users to exclude, when include_generic is True excluded records will also be excluded from the generic user. Mutually exclusive with only_users.

only_users

List[str] or None

Limit records to a specific list of users, when include_generic is True the generic user’s records will also be limited to the users in this list. Mutually exclusive with skip_users.

Rolling Window Stage (DFPRollingWindowStage)

The DFPRollingWindowStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_rolling_window_stage.py) stage performs several key pieces of functionality for DFP.

  1. This stage keeps a moving window of logs on a per user basis

    • These logs are saved to disk to reduce memory requirements between logs from the same user

  2. It only emits logs when the window history requirements are met

    • Until all of the window history requirements are met, no messages will be sent to the rest of the pipeline.

    • Configuration options for defining the window history requirements are detailed below.

  3. It repeats the necessary logs to properly calculate log dependent features.

    • To support all column feature types, incoming log messages can be combined with existing history and sent to downstream stages.

    • For example, to calculate a feature that increments a counter for the number of logs a particular user has generated in a single day, we must have the user’s log history for the past 24 hours. To support this, this stage will combine new logs with existing history into a single DataFrame.

    • It is the responsibility of downstream stages to distinguish between new logs and existing history.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

min_history

int

Exclude users with less than min_history records, setting this to 1 effectively disables this feature

min_increment

int

Exclude incoming batches for users where less than min_increment new records have been added since the last batch, setting this to 0 effectively disables this feature

max_history

int, str or None

When not None, include up to max_history records. When max_history is an int, then the last max_history records will be included. When max_history is a str it is assumed to represent a duration parsable by pandas.Timedelta and only those records within the window of [latest timestamp - max_history, latest timestamp] will be included.

cache_dir

str

Optional path to cache directory, cached items will be stored in a subdirectory under cache_dir named rolling-user-data this directory, along with cache_dir will be created if it does not already exist.

Note: this stage computes a row hash for the first and last rows of the incoming DataFrame as such all data contained must be hashable, any non-hashable values such as lists should be dropped or converted into hashable types in the DFPFileToDataFrameStage.

Preprocessing Stage (DFPPreprocessingStage)

The DFPPreprocessingStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_preprocessing_stage.py) stage, the actual logic of preprocessing is defined in the input_schema argument. Since this stage occurs in the pipeline after the DFPFileBatcherStage and DFPSplitUsersStage stages all records in the incoming DataFrame correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the logcount and locincrement features mentioned above. Making the type of processing performed in this stage different from those performed in the DFPFileToDataFrameStage.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

input_schema

DataFrameInputSchema

Schema specifying columns to be included in the output DataFrame including computed columns

dfp_training_overview.png

Training must begin with the generic user model​ which is trained with the logs from all users. This model serves as a fallback model for users and accounts without sufficient training data​. The name of the generic user is defined in the ae.fallback_username attribute of the Morpheus config object and defaults to generic_user.

After training the generic model, individual user models can be trained​. Individual user models provide better accuracy but require sufficient data​. Many users do not have sufficient data to train the model accurately​.

Training Stages

Training Stage (DFPTraining)

The DFPTraining (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_training.py) trains a model for each incoming DataFrame and emits an instance of morpheus.messages.multi_ae_message.MultiAEMessage containing the trained model.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

model_kwargs

dict or None

Optional dictionary of keyword arguments to be used when constructing the model. Refer to AutoEncoder for information on the available options.

epochs

int

Number of training epochs. Default is 30.

validation_size

float

Proportion of the input dataset to use for training validation. Should be between 0.0 and 1.0. Default is 0.0.

MLflow Model Writer Stage (DFPMLFlowModelWriterStage)

The DFPMLFlowModelWriterStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_mlflow_model_writer.py) stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records).

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

model_name_formatter

str

Optional format string to control the name of models stored in MLflow, default is dfp-{user_id}. Currently available field names are: user_id and user_md5 which is an md5 hexadecimal digest as returned by hash.hexdigest.

experiment_name_formatter

str

Optional format string to control the experiment name for models stored in MLflow, default is /dfp-models/{reg_model_name}. Currently available field names are: user_id, user_md5 and reg_model_name which is the model name as defined by model_name_formatter once the field names have been applied.

databricks_permissions

dict or None

Optional, when not None sets permissions needed when using a databricks hosted MLflow server

Note: If using a remote MLflow server, users will need to call mlflow.set_tracking_uri before starting the pipeline.

dfp_inference_overview.png

Inference Stages

Inference Stage (DFPInferenceStage)

The DFPInferenceStage (examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_inference_stage.py) stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original DataFrame along with new columns containing the z score (mean_abs_z), as well as the name and version of the model that generated that score (model_version). For each feature in the model, three additional columns will also be added:

  • <feature name>_loss : The loss

  • <feature name>_z_loss : The loss z-score

  • <feature name>_pred : The predicted value

For a hypothetical feature named result, the three added columns will be: result_loss, result_z_loss, result_pred.

For performance models fetched from MLflow are cached locally and are cached for up to 10 minutes allowing updated models to be routinely updated. In addition to caching individual models, the stage also maintains a cache of which models are available, so a newly trained user model published to MLflow won’t be visible to an already running inference pipeline for up to 10 minutes.

For any user without an associated model in MLflow, the model for the generic user is used. The name of the generic user is defined in the ae.fallback_username attribute of the Morpheus config object defaults to generic_user.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

model_name_formatter

str

Format string to control the name of models fetched from MLflow. Currently available field names are: user_id and user_md5 which is an md5 hexadecimal digest as returned by hash.hexdigest.

Filter Detection Stage (FilterDetectionsStage)

This stage filters the output from the inference stage for any anomalous messages. Logs which exceed the specified Z-Score will be passed onto the next stage. All remaining logs which are below the threshold will be dropped. For the purposes of the DFP pipeline, this stage is configured to use the mean_abs_z column of the DataFrame as the filter criteria.

Name

Type

Default

Description

threshold

float

0.5

The threshold value above which logs are considered to be anomalous. The default is 0.5; however, the DFP pipeline uses a value of 2.0. All normal logs will be filtered out and anomalous logs will be passed on.

copy

bool

True

When the copy argument is True (default), rows that meet the filter criteria are copied into a new dataframe. When False sliced views are used instead. This is a performance optimization, and has no functional impact.

filter_source

FilterSource

FilterSource.Auto

Indicates if the filter criteria exists in an output tensor (FilterSource.TENSOR) or a column in a DataFrame (FilterSource.DATAFRAME).

field_name

str

probs

Name of the tensor (filter_source=FilterSource.TENSOR) or DataFrame column (filter_source=FilterSource.DATAFRAME) to use as the filter criteria.

Post Processing Stage (DFPPostprocessingStage)

This stage adds a new event_time column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any NAN values with the a string value of 'NaN'.

A module is a type of work unit that can be utilized in the Morpheus stage and can be registered to a MRC segment module registry. Modules are beneficial when there is a possibility for the work-unit to be reused. We can load the module from the registry into a multiple contexts without having to be familiar with the inner workings of the Module; all that is needed is that we pass an input and it returns the output.

Let’s first look at the module implementation structure before diving deeper into the DFP Training pipeline as a module.

Note: Modules can be used for more than just creating middle nodes to connect sources and sinks. Additionally, it can be used to construct Source and Sink nodes.

Copy
Copied!
            

import mrc import typing from morpheus.utils.module_utils import get_module_config from morpheus.utils.module_utils import register_module @register_module("SimpleModule", "morpheus_modules") def module_init(builder: mrc.Builder): module_id = "SimpleModule" config: typing.Dict[str, str] = get_module_config(module_id, builder) sep = config.get("sep", ",") def on_data(message: str): # Your implementation goes here... def node_fn(obs: mrc.Observable, sub: mrc.Subscriber): obs.pipe(ops.map(on_data), ops.filter(lambda x: x is not None)).subscribe(sub) # Here we are creating a node. node = builder.make_node_full(module_id, node_fn) # Register input and output port name for a module. builder.register_module_input("<input port name>", node) builder.register_module_output("<output port name>", node)

The register_module decorator on the module initializer function registers the module with the SimpleModule (module_id) and the morpheus_modules (namespace).

Note: While registering the module, the user has the opportunity to choose the input and output port names. When the module has been registered. To obtain the input /output port connection, the same names must be used.

Required key meta fields for module configuration as shown below.

  • module_id : Unique identifier for a module in the module registry.

  • module_name : Specifies the module name.

  • namespace : Virtually cluster the modules.

Copy
Copied!
            

# Module configuration module_config = { "module_id": "SimpleModule", "module_name": "simple_module", "namespace": "morpheus_modules", "sep": ":" }

The module must be packaged as a stage, as illustrated below, in order to be used in the Morpheus pipeline.

Copy
Copied!
            

from morpheus.config import Config from morpheus.stages.general.linear_modules_stage import LinearModulesStage # Morpheus configuration c = Config() module_stage = LinearModulesStage(c, module_config, input_port_name="<input port name>", output_port_name="<input port name>", output_type="<module output type>")

LinearModulesStage is an utility stage that loads an existing, registered, MRC SegmentModule and wraps it as a Morpheus SinglePortStage.

Argument

Type

Description

c

morpheus.config.Config

Morpheus config object

module_config

dict or None

Module configuration

input_port_name

str

Name of a module input port, as used during registration

output_port_name

str

Name of a module output port, as used during registration

output_type

typing.Any [default]

Module output data type

A module can serve as a wrapper for a chain of complex constructs-containing child modules. The example below demonstrates how to establish a chain module, presuming modules_1 through module_n are already registered.

Copy
Copied!
            

import mrc import typing from morpheus.utils.module_utils import get_module_config from morpheus.utils.module_utils import register_module @register_module("ChainModule", "morpheus_modules") def simple_module(builder: mrc.Builder): module_id = "ChainModule" config: typing.Dict[str, str] = get_module_config(module_id, builder) # Get module configurations module_1_config = config.get("module_1", None) ... ... module_n_config = config.get("module_n", None) # Load modules from the registry module_1 = load_module(module_1_config, builder=builder) ... ... module_n = load_module(module_n_config, builder=builder) # Make an edge between the modules. # input/output port names used at 'builder.register_module_input' and 'builder.register_module_output'. builder.make_edge(module_1.output_port("<output port name>"), module_2.input_port("<input port name>")) ... ... builder.make_edge(module_n-1.output_port("<output port name>"), module_n.input_port("<input port name>")) # Register input and output port for a module. builder.register_module_input("<your input port name>", module_1.input_port("<input port name>")) builder.register_module_output("<your output port name>", module_n.output_port("<output port name>"))

Let’s look at the DFP Training process based on modules. On a higher level, the complete DFP training process has been divided into two modules.

  • DFPPreprocessing

    This module constructs a chain module by combining the separate modules listed below into a single module that contains all of the internal components for preprocessing the Azure Active Directory and Duo Authentication logs.

    • FileBatcher

    • FileToDF

    • DFPSplitUsers

    • DFPRollingWindow

    • DFPDataPrep

  • DFPModelTrainDeploy

    This module creates a chain module by integrating the individual modules described below into a single module that incorporates all of the internals for consuming preprocessed data, training the model, and writing the trained model to MLFLow server to serve inference requests.

    • DFPTraining

    • MLFlowModelWriter

Run DFP Training Pipeline with Modules

To run the DFP pipelines using modules with the example datasets, within the container run:

  • Duo Training Pipeline

    Copy
    Copied!
                

    python dfp_duo_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/duo-training-data/*.json"

  • Azure Training Pipeline

    Copy
    Copied!
                

    python dfp_azure_modules_pipeline.py --train_users=all --start_time="2022-08-01" --input_file="/workspace/examples/data/dfp/azure-training-data/*.json"

Benchmarks

DFP pipeline benchmarks setup and execution instructions are provided here

© Copyright 2023, NVIDIA. Last updated on Apr 11, 2023.