Introduction to Modular Digital Fingerprinting Pipeline (Integrated Training)

Motivation

This document presents the adaptation of the Digital Fingerprinting pipeline in Morpheus from the existing stage-based approach to one that is module-based; this process will provide a basis to work through motivation and usage examples for number of new features found in the 23.03 release. The updated pipeline incorporates extensions to facilitate event-driven workflows and human-in-the-loop interactions through the use of control messages. Moreover, it introduces a dynamic method for acquiring and loading data, further enhancing the pipeline’s capabilities.

The pipeline comprises a series of interconnected modules designed to create a versatile split processing pipeline. This design enables the reception and processing of control messages to perform tasks such as inference against observed events using either generic or user-specific models. Additionally, the pipeline can train new models based on aggregated or predefined training data, offering a more adaptable and user-friendly experience.

Overview

At a high level, the pipeline consists of three parts: the front-end file list loader that reads new control messages from a Kafka topic and expands the described data sources to be processed, and the training and inference pipelines that process the data. The updated pipeline enables the reception and processing of control messages, allowing for tasks such as inference against observed events using generic or user-specific models. It also allows for the training of new models based on aggregated or predefined training data.

The front-end loader outputs one or more control messages that are passed to the training and inference pipelines. Messages are either dropped or passed to the next module in the pipeline based on the message type. The updated pipeline implicitly supports two distinct workflows: inference and training. However, because of the use of control messages, it can also support a hybrid data processing workflow that handles both streaming data processed in real-time, aggregated, batched, and subsequently used for training, as well as interleaved self-contained training tasks that specify the training data to be used and are able to bypass batching and aggregation stages.

Moreover, the updated pipeline supports human-in-the-loop workflows, such as the ability to manually trigger training or inference tasks against a specific set of data, and the capacity for real-time labeling of production inference events that can be injected back into the training pipeline.

The following content will track the pipeline declared in examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py

Copy
Copied!
            

# Setup and command line argument parsing ... # Create an empty pipeline pipeline = Pipeline(config) # Create our Kafka source stage that we can read control messages from. source_stage = pipeline.add_stage( ControlMessageKafkaSourceStage(config, bootstrap_servers=kwargs["bootstrap_servers"], input_topic=list(kwargs["input_topic"]), group_id=kwargs["group_id"], poll_interval=kwargs["poll_interval"], disable_commit=kwargs["disable_commit"], disable_pre_filtering=kwargs["disable_pre_filtering"])) # Create our DFP deployment stage that will load our Digital Fingerprinting module into the pipeline. dfp_deployment_stage = pipeline.add_stage( MultiPortModulesStage(config, dfp_deployment_module_config, input_ports=["input"], output_ports=["output_0", "output_1"])) # Connect the source stage to the DFP deployment module pipeline.add_edge(source_stage, dfp_deployment_stage) # Run the pipeline pipeline.run()

For a full introduction in how to set up and run morpheus, please refer to the Getting Started guide.

For a full introduction to Morpheus modules, refer to the Python Modules and C++ Modules guides.

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_deployment.py

This is the top level module that encapsulates the entire Digital Fingerprinting pipeline, it is primarily responsible for wrapping the training and inference pipelines, providing the correct module interface, and doing some configuration pre-processing. Since this module is monolithic, it supports a significant number of configuration options; however, the majority of these have intelligent defaults and are not required to be specified.

The module consists of three chained sub-modules:

  • fsspec_dataloader

    • Responsible for expanding data source declarations into individual files that can be processed by the pipeline.

  • dfp_training_pipe

    • Connected to the output of the preprocessing stage. Responsible for handling training based control messages.

  • dfp_inference_pipe

    • Connected to the output of the preprocessing stage. Responsible for handling inference based control messages.

For a complete reference, refer to: DFP Deployment

Copy
Copied!
            

@register_module(DFP_DEPLOYMENT, MORPHEUS_MODULE_NAMESPACE) def dfp_deployment(builder: mrc.Builder): # Setup and configuration parsing ... # Make an edge between modules builder.make_edge(fsspec_dataloader_module.output_port("output"), broadcast) builder.make_edge(broadcast, dfp_training_pipe_module.input_port("input")) builder.make_edge(broadcast, dfp_inference_pipe_module.input_port("input")) out_nodes = [dfp_training_pipe_module.output_port("output"), dfp_inference_pipe_module.output_port("output")] # Register input port for a module. builder.register_module_input("input", fsspec_dataloader_module.input_port("input"))

fsspec Dataloader

Source: morpheus/loaders/fsspec_loader.py

This is an instance of the new DataLoader module, utilizing a pre-defined ‘fsspec’ style loader. The module is used to transform glob specified file lists into individual file paths and update the control message with those paths.

For a complete reference, refer to: DataLoader Module

There are a number of modules that are used in both the training and inference pipelines, but which are be configured independently. We’ll introduce Shared modules here and then dive into the unique modules for each pipeline.

DFP Preprocessing

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_preproc.py

The dfp_preproc module is a functional component within the Morpheus framework that combines multiple data filtering and processing pipeline modules related to inference and training. This module simplifies the pipeline by consolidating various modules into a single, cohesive unit. The dfp_preproc module offers configurability for parameters such as cache directory, timestamp column name, pre-filter options, batching options, user splitting options, and supported data loaders for different file types.

The module itself consists of a series of chained sub-modules, which are connected in a logical sequence:

  • filter_control_message_module

    • Responsible for early filtering of control messages that should be not processed by the pipeline.

  • file_batcher_module

    • Responsible for batching files, either into a single control message in the case of an encapsulated training message, or into a series of control messages in the of streaming data.

  • file_to_df_dataloader_module

    • Responsible for file retrieval and insertion into a cuDF dataframe.

  • dfp_split_users_module

    • Responsible for splitting the dataframe into a series of dataframes, one per user.

For a complete reference, refer to: DFP Preproc

Copy
Copied!
            

@register_module(DFP_PREPROC, MORPHEUS_MODULE_NAMESPACE) def dfp_preproc(builder: mrc.Builder): # Setup and configuration parsing ... # Connect the modules. builder.make_edge(filter_control_message_module.output_port("output"), file_batcher_module.input_port("input")) builder.make_edge(file_batcher_module.output_port("output"), file_to_df_dataloader_module.input_port("input")) builder.make_edge(file_to_df_dataloader_module.output_port("output"), dfp_split_users_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", filter_control_message_module.input_port("input")) builder.register_module_output("output", dfp_split_users_module.output_port("output"))

Control Message Filter

Source: morpheus/modules/filter_control_message.py

The filter_control_message module is a component designed to discard control messages based on specified filtering criteria. This module allows users to configure filtering options such as task type and data type. When either task filtering or data type filtering is enabled, the module processes control messages to verify if they meet the specified criteria. If the control message does not match the criteria, it is discarded. The module uses a node function to filter and process control messages, and registers input and output ports to facilitate seamless integration into the data processing pipeline.

For a complete reference, refer to: Filter Control Message

File Batcher

Source: morpheus/modules/file_batcher.py

The file_batcher module is a component that is responsible for loading input files, filtering out files older than the specified time window, and grouping the remaining files by periods that fall within the time window. This module offers configurability for parameters such as batching options, cache directory, file type, filtering null values, data schema, and the timestamp column name. The file_batcher module processes control messages, validates them, and generates a list of files with their timestamps. The module then groups files by the given period, creates control messages for each batch, and sends them downstream for further processing. A node function is used to handle the processing of control messages, and input and output ports are registered to integrate the module into the data processing pipeline seamlessly.

The file batcher is one of the first pipeline components that begins to differ more substantially from the previous raw-data pipeline, prior to 23.03. In addition to its previous functionality, the file batcher is now control message aware, and can handle both streaming and encapsulated control messages, a property denoted by the data_type property of the control message’s metadata being set as either streaming or payload. Additionally, the file batcher’s default processing criteria for period, sampling_rate_s, start_time, and end_time can now be overridden by their corresponding values in the control message’s batching_options metadata entry.

In the case of streaming data, the file batcher will operate as it did previously, grouping files as specified by the period, sampling_rate_s, start_time, and end_time properties, creating a control message for each batch, and forwarding them downstream. In the case of encapsulated data, the file batcher will operate similarly, but will only create a single control message for the entire payload, and forward it downstream. In this way, it is possible to attach all the necessary training data to a given training task, and skip any downstream aggregation.

For a complete reference, refer to: File Batcher

Copy
Copied!
            

@register_module(FILE_BATCHER, MORPHEUS_MODULE_NAMESPACE) def file_batcher(builder: mrc.Builder): # Setup and configuration parsing ...

File to DF DataLoader

Source: morpheus/loaders/file_to_df_loader.py

This is an instance of the new DataLoader module, utilizing a pre-defined ‘file_to_df’ style loader. The module is used to process ‘load’ tasks that reference files which need to be retrieved, possibly cached, and then loaded into a cuDF dataframe with is set as the control message payload.

For a complete reference, refer to: DataLoader Module

DFP Split Users

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_split_users.py

The dfp_split_users module is responsible for splitting the input data based on user IDs. The module provides configuration options, such as fallback username, include generic user, include individual users, and specify lists of user IDs to include or exclude in the output.

The module processes control messages by extracting the user information from the message payload, filtering the data based on the provided configuration, and splitting the data by user ID. For each user ID, the function generates a new control message containing the corresponding data and sends it downstream for further processing.

For a complete reference, refer to: DFP Split Users

Copy
Copied!
            

@register_module(DFP_SPLIT_USERS, MORPHEUS_MODULE_NAMESPACE) def dfp_split_users(builder: mrc.Builder): # Setup and configuration parsing ...

DFP Rolling Window

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_rolling_window.py

The dfp_rolling_window module is responsible for maintaining a rolling window of historical data, acting as a streaming caching and batching system. The module provides various configuration options, such as aggregation span, cache directory, caching options, timestamp column name, and trigger conditions.

The main functionality of the module is to processes control messages containing data. For each control message, the function determines the user ID and data type, then tries to build a rolling window with the historical data from the cache. If enough data is available based on the trigger conditions, the function returns a control message with the appropriate historical data for further processing.

The Rolling Window module is another example of a module that has been updated to be control message aware. In that it will differentiate between streaming and payload control messages, and handle them accordingly. In the case of a streaming control message, the module will process the message as it did previously, and either cache the streaming data and return, or if the trigger conditions are met, return a control message with the appropriate historical data. In the case of a payload control message, the rolling window module will be skipped entirely and simply forward the message to the next stage.

The Rolling window module has also been updated to support an additional batch mode of operation, in which it will cache streaming data until the trigger conditions are met, generate a new control message with the all existing data, flush the cache, and then forward the message downstream. Batch caching is the default mode for the streaming inference pipeline, and improves performance by reducing the bookkeeping required. This mode of operation is denoted by the cache_mode property of the module’s configuration.

For a complete reference, refer to: DFP Rolling Window

Copy
Copied!
            

@register_module(DFP_ROLLING_WINDOW, MORPHEUS_MODULE_NAMESPACE) def dfp_rolling_window(builder: mrc.Builder): # Setup and configuration parsing ...

DFP Data Prep

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_data_prep.py

The dfp_data_prep module is responsible for preparing data for either inference or model training. The module requires a defined schema for data preparation.

The main functionality of the module is in the process_features function. For each control message containing data, the function processes the columns of the data according to the given schema. The processed dataframe is then applied to the control message payload.

For a complete reference, refer to: DFP Data Prep

Copy
Copied!
            

@register_module(DFP_DATA_PREP, MORPHEUS_MODULE_NAMESPACE) def dfp_data_prep(builder: mrc.Builder): # Setup and configuration parsing ...

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training_pipe.py

The DFP Training Pipe module is a consolidated module that integrates several DFP pipeline modules that are essential to the training process. This module function provides a single entry point to the training pipeline, simplifying the process of training a model. The module offers configurable parameters for various stages in the pipeline, including data batching, data preprocessing, and data encoding for model training. Additionally, the MLflow model writer options allow for the trained model to be saved for future use.

The module itself consists of a series of chained sub-modules, each of which performs a specific task in the training:

  • preproc

    • Data filtering and preprocessing

  • dfp_rolling_window

    • Data caching and batching

  • dfp_data_prep

    • Data encoding

  • dfp_training

    • Model training

  • mlflow_model_writer

    • Model and telemetry saving to MLflow

For a complete reference, refer to: DFP Training Pipe

Copy
Copied!
            

@register_module(DFP_TRAINING_PIPE, MORPHEUS_MODULE_NAMESPACE) def dfp_training_pipe(builder: mrc.Builder): # Setup and config parsing ... # Make an edge between the modules. builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input")) builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_training_module.input_port("input")) builder.make_edge(dfp_training_module.output_port("output"), mlflow_model_writer_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", preproc_module.input_port("input")) builder.register_module_output("output", mlflow_model_writer_module.output_port("output"))

DFP Training

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_training.py

The dfp_training module function is responsible for training the model. The on_data function is defined to handle incoming ControlMessage instances. It retrieves the user ID and the input data from the ControlMessage, creates an instance of the AutoEncoder class with the specified model_kwargs, and trains the model on the input data. The output message includes the trained model and metadata.

For a complete reference, refer to: DFP Training

Copy
Copied!
            

@register_module(DFP_TRAINING, MORPHEUS_MODULE_NAMESPACE) def dfp_inference(builder: mrc.Builder): # Setup and config parsing ...

MLFlow Model Writer

Source: morpheus/modules/mlflow_model_writer.py

The mlflow_model_writer module is responsible for uploading trained models to the MLflow server.

For each MultiAEMessage received, containing a trained model, the function uploads the model to MLflow along with associated metadata such as experiment name, run name, parameters, metrics, and the model signature. If the MLflow server is running on Databricks, the function also applies the required permissions to the registered model.

For a complete reference, refer to: MLflow Model Writer

Copy
Copied!
            

@register_module(MLFLOW_MODEL_WRITER, MORPHEUS_MODULE_NAMESPACE) def mlflow_model_writer(builder: mrc.Builder): # Setup and configuration parsing ...

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference_pipe.py

The dfp_inference_pipe module function consolidates multiple digital fingerprinting pipeline (DFP) modules relevant to the inference process into a single module. Its purpose is to simplify the creation and configuration of an inference pipeline by combining all necessary components.

The module sets up a series of interconnected components that handle various stages of the inference process, such as preprocessing, rolling window aggregation, data preparation, inference, detection filtering, post-processing, serialization, and writing the output to a file.

The module itself consists of a series of chained sub-modules, each of which performs a specific task in the inference pipeline:

  • dfp_preproc

    • Data filtering and preprocessing

  • dfp_rolling_window

    • Data caching and batching

  • dfp_data_prep

    • Data encoding

  • dfp_inference

    • Model inference

  • filter_detections

    • Detection filtering

  • dfp_post_proc

    • Detection post-processing

  • serialize

    • Detection serialization

  • write_to_file

    • Detection writing to file

For a complete reference, refer to: DFP Inference Pipe

Copy
Copied!
            

@register_module(DFP_INFERENCE_PIPE, MORPHEUS_MODULE_NAMESPACE) def dfp_inference_pipe(builder: mrc.Builder): # Setup and config parsing ... # Make an edge between the modules. builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input")) builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_inference_module.input_port("input")) builder.make_edge(dfp_inference_module.output_port("output"), filter_detections_module.input_port("input")) builder.make_edge(filter_detections_module.output_port("output"), dfp_post_proc_module.input_port("input")) builder.make_edge(dfp_post_proc_module.output_port("output"), serialize_module.input_port("input")) builder.make_edge(serialize_module.output_port("output"), write_to_file_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", preproc_module.input_port("input")) builder.register_module_output("output", write_to_file_module.output_port("output"))

DFP Inference

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_inference.py

The dfp_inference module function creates an inference module that retrieves trained models and performs inference on the input data. The module requires a model_name_formatter and a fallback_username to be configured in its parameters.

The function defines a get_model method to load the model for a specific user, and a process_task method to handle individual inference tasks. The process_task method retrieves the user ID, extracts the payload, and converts the DataFrame to pandas format. It then attempts to load the model for the specified user ID and perform inference using the loaded model. Finally, it adds any other columns from the input data to the results DataFrame and creates a message with the output and metadata.

For a complete reference, refer to: DFP Inference

Copy
Copied!
            

@register_module(DFP_INFERENCE, MORPHEUS_MODULE_NAMESPACE) def dfp_inference(builder: mrc.Builder): # Setup and config parsing ...

Filter Detections

Source: morpheus/modules/filter_detections.py

The filter_detections module function is designed to filter rows from a DataFrame based on values in a tensor or DataFrame column according to a specified threshold. Rows are excluded if their associated value in the specified field is less than or equal to the threshold.

This module can operate in two modes, set by the copy argument. When copy=True, rows that meet the filter criteria are copied into a new DataFrame. When copy=False, sliced views are used instead.

The function defines the find_detections method to determine the filter source and identify the rows that match the filter criteria. The filter_copy and filter_slice methods are responsible for handling the filtering process based on the chosen mode.

Copy
Copied!
            

@register_module(FILTER_DETECTIONS, MORPHEUS_MODULE_NAMESPACE) def filter_detections(builder: mrc.Builder): # Setup and config parsing ...

For a complete reference, refer to: Filter Detections

DFP Post Processing

Source: examples/digital_fingerprinting/production/morpheus/dfp/modules/dfp_postprocessing.py

The dfp_postprocessing module function performs post-processing tasks on the input data.

Copy
Copied!
            

@register_module(DFP_POST_PROCESSING, MORPHEUS_MODULE_NAMESPACE) def dfp_postprocessing(builder: mrc.Builder): # Setup and config parsing ...

For a complete reference, refer to: DFP Post Processing

Serialize

Source: morpheus/modules/serialize.py

The serialize module function is responsible for filtering columns from a MultiMessage object and emitting a MessageMeta object.

The convert_to_df function converts a dataframe to JSON lines. It takes a MultiMessage instance, include_columns (a pattern for columns to include), exclude_columns (a list of patterns for columns to exclude), and columns (a list of columns to include). The function filters the columns of the input dataframe based on the include and exclude patterns and retrieves the metadata of the filtered columns.

The module function compiles the include and exclude patterns into regular expressions. It then creates a node using the convert_to_df function with the compiled include and exclude patterns and the specified columns.

Copy
Copied!
            

@register_module(SERIALIZE, MORPHEUS_MODULE_NAMESPACE) def serialize(builder: mrc.Builder): # Setup and config parsing ...

For a complete reference, refer to: Serialize

Write to File

Source: morpheus/modules/write_to_file.py

The write_to_file module function writes all messages to a file.

The convert_to_strings function takes a `DataFrame`` (either pandas or cuDF) and converts it into the appropriate string format based on the file type (JSON or CSV). It checks whether to include the index column or not.

Copy
Copied!
            

@register_module(WRITE_TO_FILE, MORPHEUS_MODULE_NAMESPACE) def write_to_file(builder: mrc.Builder): # Setup and config parsing ...

For a complete reference, refer to: Write to File

The following are steps to run modular DFP pipelines with example Azure and Duo datasets.

System requirements

Note: For GPU Requirements refer to getting_started

Building the services

From the root of the Morpheus repo, run:

Copy
Copied!
            

cd examples/digital_fingerprinting/production docker compose build

Note: This requires version 1.28.0 or higher of Docker Compose, and preferably v2. If you encounter an error similar to:

Copy
Copied!
            

ERROR: The Compose file './docker-compose.yml' is invalid because: services.jupyter.deploy.resources.reservations value Additional properties are not allowed ('devices' was unexpected)

This is most likely due to using an older version of the docker-compose command, instead re-run the build with docker compose. Refer to Migrate to Compose V2 for more information.

Downloading the example datasets

First, we will need to install s3fs and then run the examples/digital_fingerprinting/fetch_example_data.py script. This will download the example data into the examples/data/dfp dir.

From the Morpheus repo, run:

Copy
Copied!
            

pip install s3fs python examples/digital_fingerprinting/fetch_example_data.py all

Morpheus Pipeline

From the examples/digital_fingerprinting/production dir, run:

Copy
Copied!
            

docker compose run morpheus_pipeline bash

To run the DFP pipelines with the example datasets within the container, run:

  • Duo Training Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source duo \ --start_time "2022-08-01" \ --duration "60d" \ --train_users generic \ --input_file "./control_messages/duo_payload_training.json"

  • Duo Inference Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source duo \ --start_time "2022-08-30" \ --input_file "./control_messages/duo_payload_inference.json"

  • Duo Training + Inference Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source duo \ --start_time "2022-08-01" \ --duration "60d" \ --train_users generic \ --input_file "./control_messages/duo_payload_load_train_inference.json"

  • Azure Training Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source azure \ --start_time "2022-08-01" \ --duration "60d" \ --train_users generic \ --input_file "./control_messages/azure_payload_training.json"

  • Azure Inference Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source azure \ --start_time "2022-08-30" \ --input_file "./control_messages/azure_payload_inference.json"

  • Azure Training + Inference Pipeline

    Copy
    Copied!
                

    python dfp_integrated_training_batch_pipeline.py \ --log_level DEBUG \ --use_cpp=true \ --source azure \ --start_time "2022-08-01" \ --duration "60d" \ --train_users generic \ --input_file "./control_messages/azure_payload_load_train_inference.json"

Output Fields

The output files, dfp_detectiions_duo.csv and dfp_detections_azure.csv, will contain those logs from the input dataset for which an anomaly was detected; this is determined by the z-score in the mean_abs_z field. By default, any logs with a z-score of 2.0 or higher are considered anomalous. Refer to DFPPostprocessingStage.

Most of the fields in the output files generated by running the above examples are input fields or derived from input fields. The additional output fields are:

Field

Type

Description

event_time TEXT ISO 8601 formatted date string, the time the anomaly was detected by Morpheus
model_version TEXT Name and version of the model used to performed the inference, in the form of <model name>:<version>
max_abs_z FLOAT Max z-score across all features
mean_abs_z FLOAT Average z-score across all features

In addition to this, for each input feature the following output fields will exist:

Field

Type

Description

<feature name>_loss FLOAT The loss
<feature name>_z_loss FLOAT The loss z-score
<feature name>_pred FLOAT The predicted value

Refer to DFPInferenceStage for more on these fields.

Previous Morpheus Control Messages
Next Developer Guides
© Copyright 2023, NVIDIA. Last updated on Feb 2, 2024.