Introduction to Digital Fingerprinting Pipeline in Morpheus
This document presents the adaptation of the Digital Fingerprinting pipeline in Morpheus from the existing stage-based approach to one that is module-based; this process will provide a basis to work through motivation and usage examples for number of new features found in the 23.03 release. The updated pipeline incorporates extensions to facilitate event-driven workflows and human-in-the-loop interactions through the use of control messages. Moreover, it introduces a dynamic method for acquiring and loading data, further enhancing the pipeline’s capabilities.
The pipeline comprises a series of interconnected modules designed to create a versatile split processing pipeline. This design enables the reception and processing of control messages to perform tasks such as inference against observed events using either generic or user-specific models. Additionally, the pipeline can train new models based on aggregated or predefined training data, offering a more adaptable and user-friendly experience.
At a high level, the pipeline consists of three parts: the front-end file list loader that reads new control messages from a Kafka topic and expands the described data sources to be processed, and the training and inference pipelines that process the data. The updated pipeline enables the reception and processing of control messages, allowing for tasks such as inference against observed events using generic or user-specific models. It also allows for the training of new models based on aggregated or predefined training data.
The front-end loader outputs one or more control messages that are passed to the training and inference pipelines. Messages are either dropped or passed to the next module in the pipeline based on the message type. The updated pipeline implicitly supports two distinct workflows: inference and training. However, because of the use of control messages, it can also support a hybrid data processing workflow that handles both streaming data processed in real-time, aggregated, batched, and subsequently used for training, as well as interleaved self-contained training tasks that specify the training data to be used and are able to bypass batching and aggregation stages.
Moreover, the updated pipeline supports human-in-the-loop workflows, such as the ability to manually trigger training or inference tasks against a specific set of data, and the capacity for real-time labeling of production inference events that can be injected back into the training pipeline.
The following content will track the pipeline declared in
# Setup and command line argument parsing ... # Create an empty pipeline pipeline = Pipeline(config) # Create our Kafka source stage that we can read control messages from. source_stage = pipeline.add_stage( ControlMessageKafkaSourceStage(config, bootstrap_servers=kwargs["bootstrap_servers"], input_topic=kwargs["input_topic"], group_id=kwargs["group_id"], poll_interval=kwargs["poll_interval"], disable_commit=kwargs["disable_commit"], disable_pre_filtering=kwargs["disable_pre_filtering"])) # Create our DFP deployment stage that will load our Digital Fingerprinting module into the pipeline. dfp_deployment_stage = pipeline.add_stage( MultiPortModulesStage(config, dfp_deployment_module_config, input_port_name="input", output_port_name_prefix="output", num_output_ports=num_output_ports)) # Connect the source stage to the DFP deployment module pipeline.add_edge(source_stage, dfp_deployment_stage) # Run the pipeline pipeline.run()
For a full introduction in how to set up and run morpheus, please refer to the Getting Started guide.
For a full introduction to Morpheus modules, please refer to the Python Modules and C++ Modules guides.
This is the top level module that encapsulates the entire Digital Fingerprinting pipeline, it is primarily responsible for wrapping the training and inference pipelines, providing the correct module interface, and doing some configuration pre-processing. Since this module is monolithic, it supports a significant number of configuration options; however, the majority of these have intelligent defaults and are not required to be specified.
The module consists of three chained sub-modules:
Responsible for expanding data source declarations into individual files that can be processed by the pipeline.
Connected to the output of the preprocessing stage. Responsible for handling training based control messages.
Connected to the output of the preprocessing stage. Responsible for handling inference based control messages.
For a complete reference, see: DFP Deployment
@register_module(DFP_DEPLOYMENT, MORPHEUS_MODULE_NAMESPACE) def dfp_deployment(builder: mrc.Builder): # Setup and configuration parsing ... # Make an edge between modules builder.make_edge(fsspec_dataloader_module.output_port("output"), broadcast) builder.make_edge(broadcast, dfp_training_pipe_module.input_port("input")) builder.make_edge(broadcast, dfp_inference_pipe_module.input_port("input")) out_streams = [dfp_training_pipe_module.output_port("output"), dfp_inference_pipe_module.output_port("output")] # Register input port for a module. builder.register_module_input("input", fsspec_dataloader_module.input_port("input"))
This is an instance of the new DataLoader module, utilizing a pre-defined ‘fsspec’ style loader. The module is used to transform regex specified file lists into individual file paths and update the control message with those paths.
For a complete reference, see: DataLoader Module
FS Spec Dataloader
There are a number of modules that are used in both the training and inference pipelines, but which are be configured independently. We’ll introduce Shared modules here and then dive into the unique modules for each pipeline.
The dfp_preproc module is a functional component within the MORPHEUS framework that combines multiple data filtering and processing (dfp) pipeline modules related to inference and training. This module simplifies the pipeline by consolidating various modules into a single, cohesive unit. The dfp_preproc module offers configurability for parameters such as cache directory, timestamp column name, pre-filter options, batching options, user splitting options, and supported data loaders for different file types.
The module itself consists of a series of chained sub-modules, which are connected in a logical sequence:
Responsible for early filtering of control messages that should be not processed by the pipeline.
Responsible for batching files, either into a single control message in the case of an encapsulated training message, or into a series of control messages in the of streaming data.
Responsible for file retrieval and insertion into a cuDF dataframe.
Responsible for splitting the dataframe into a series of dataframes, one per user.
For a complete reference, see: DFP Preproc
@register_module(DFP_PREPROC, MORPHEUS_MODULE_NAMESPACE) def dfp_preproc(builder: mrc.Builder): # Setup and configuration parsing ... # Connect the modules. builder.make_edge(filter_control_message_module.output_port("output"), file_batcher_module.input_port("input")) builder.make_edge(file_batcher_module.output_port("output"), file_to_df_dataloader_module.input_port("input")) builder.make_edge(file_to_df_dataloader_module.output_port("output"), dfp_split_users_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", filter_control_message_module.input_port("input")) builder.register_module_output("output", dfp_split_users_module.output_port("output"))
Control Message Filter
The filter_control_message module is a component designed to discard control messages based on specified filtering criteria. This module allows users to configure filtering options such as task type and data type. When either task filtering or data type filtering is enabled, the module processes control messages to verify if they meet the specified criteria. If the control message does not match the criteria, it is discarded. The module uses a node function to filter and process control messages, and registers input and output ports to facilitate seamless integration into the data processing pipeline.
For a complete reference, see: Filter Control Message
The file_batcher module is a component that is responsible for loading input files, filtering out files older than the specified time window, and grouping the remaining files by periods that fall within the time window. This module offers configurability for parameters such as batching options, cache directory, file type, filtering null values, data schema, and the timestamp column name. The file_batcher module processes control messages, validates them, and generates a list of files with their timestamps. The module then groups files by the given period, creates control messages for each batch, and sends them downstream for further processing. A node function is used to handle the processing of control messages, and input and output ports are registered to integrate the module into the data processing pipeline seamlessly.
The file batcher is one of the first pipeline components that begins to differ more substantially from the previous
raw-data pipeline, prior to 23.03. In addition to its previous functionality, the file batcher is now control
message aware, and can handle both streaming and encapsulated control messages, a property denoted by the
property of the control message’s metadata being set as either
payload. Additionally, the file
batcher’s default processing criteria for
end_time can now be
overridden by their corresponding values in the control message’s
batching_options metadata entry.
In the case of streaming data, the file batcher will operate as it did previously, grouping files by the specified
end_time properties, creating a control message for each
batch, and forwarding them downstream. In the case of encapsulated data, the file batcher will operate similarly, but
will only create a single control message for the entire payload, and forward it downstream. In this way, it is
possible to attach all the necessary training data to a given training task, and skip any downstream aggregation.
For a complete reference, see: File Batcher
@register_module(FILE_BATCHER, MORPHEUS_MODULE_NAMESPACE) def file_batcher(builder: mrc.Builder): # Setup and configuration parsing ...
File to DF DataLoader
This is an instance of the new DataLoader module, utilizing a pre-defined ‘file_to_df’ style loader. The module is used to process ‘load’ tasks that reference files which need to be retrieved, possibly cached, and then loaded into a cuDF dataframe with is set as the control message payload.
For a complete reference, see: DataLoader Module
DFP Split Users
The dfp_split_users module is responsible for splitting the input data based on user IDs. The module provides configuration options, such as fallback username, include generic user, include individual users, and specify lists of user IDs to include or exclude in the output.
The module processes control messages by extracting the user information from the message payload, filtering the data based on the provided configuration, and splitting the data by user ID. For each user ID, the function generates a new control message containing the corresponding data and sends it downstream for further processing.
For a complete reference, see: DFP Split Users
@register_module(DFP_SPLIT_USERS, MORPHEUS_MODULE_NAMESPACE) def dfp_split_users(builder: mrc.Builder): # Setup and configuration parsing ...
DFP Rolling Window
The dfp_rolling_window module is responsible for maintaining a rolling window of historical data, acting as a streaming caching and batching system. The module provides various configuration options, such as aggregation span, cache directory, caching options, timestamp column name, and trigger conditions.
The main functionality of the module is to processes control messages containing data. For each control message, the function determines the user ID and data type, then tries to build a rolling window with the historical data from the cache. If enough data is available based on the trigger conditions, the function returns a control message with the appropriate historical data for further processing.
The Rolling Window module is another example of a module that has been updated to be control message aware. In that it will differentiate between streaming and payload control messages, and handle them accordingly. In the case of a streaming control message, the module will process the message as it did previously, and either cache the streaming data and return, or if the trigger conditions are met, return a control message with the appropriate historical data. In the case of a payload control message, the rolling window module will be skipped entirely and simply forward the message to the next stage.
The Rolling window module has also been updated to support an additional
batch mode of operation, in which it will
cache streaming data until the trigger conditions are met, generate a new control message with the all existing data,
flush the cache, and then forward the message downstream. Batch caching is the default mode for the streaming
inference pipeline, and improves performance by reducing the bookkeeping required. This mode of operation is denoted
cache_mode property of the module’s configuration.
For a complete reference, see: DFP Rolling Window
@register_module(DFP_ROLLING_WINDOW, MORPHEUS_MODULE_NAMESPACE) def dfp_rolling_window(builder: mrc.Builder): # Setup and configuration parsing ...
DFP Data Prep
The dfp_data_prep module is responsible for preparing data for either inference or model training. The module requires a defined schema for data preparation.
The main functionality of the module is in the process_features function. For each control message containing data, the function processes the columns of the data according to the given schema. The processed dataframe is then applied to the control message payload.
For a complete reference, see: DFP Data Prep
@register_module(DFP_DATA_PREP, MORPHEUS_MODULE_NAMESPACE) def dfp_data_prep(builder: mrc.Builder): # Setup and configuration parsing ...
The DFP Training Pipe module is a consolidated module that integrates several DFP pipeline modules that are essential to the training process. This module function provides a single entry point to the training pipeline, simplifying the process of training a model. The module offers configurable parameters for various stages in the pipeline, including data batching, data preprocessing, and data encoding for model training. Additionally, the MLflow model writer options allow for the trained model to be saved for future use.
The module itself consists of a series of chained sub-modules, each of which performs a specific task in the training:
Data filerting and preprocessing
Data caching and batching
Model and telemetry saving to MLflow
For a complete reference, see: DFP Training Pipe
@register_module(DFP_TRAINING_PIPE, MORPHEUS_MODULE_NAMESPACE) def dfp_training_pipe(builder: mrc.Builder): # Setup and config parsing ... # Make an edge between the modules. builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input")) builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_training_module.input_port("input")) builder.make_edge(dfp_training_module.output_port("output"), mlflow_model_writer_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", preproc_module.input_port("input")) builder.register_module_output("output", mlflow_model_writer_module.output_port("output"))
The mlflow_model_writer module is responsible for uploading trained models to the MLflow server.
For each MultiAEMessage received, containing a trained model, the function uploads the model to MLflow along with associated metadata such as experiment name, run name, parameters, metrics, and the model signature. If the MLflow server is running on Databricks, the function also applies the required permissions to the registered model.
For a complete reference, see: MLFlow Model Writer
@register_module(MLFLOW_MODEL_WRITER, MORPHEUS_MODULE_NAMESPACE) def mlflow_model_writer(builder: mrc.Builder): # Setup and configuration parsing ...
The dfp_inference_pipe module function consolidates multiple data fusion pipeline (DFP) modules relevant to the inference process into a single module. Its purpose is to simplify the creation and configuration of an inference pipeline by combining all necessary components.
The module sets up a series of interconnected components that handle various stages of the inference process, such as preprocessing, rolling window aggregation, data preparation, inference, detection filtering, post-processing, serialization, and writing the output to a file.
The module itself consists of a series of chained sub-modules, each of which performs a specific task in the inference pipeline:
Data filtering and preprocessing
Data caching and batching
Detection writing to file
For a complete reference, see: DFP Inference Pipe
@register_module(DFP_INFERENCE_PIPE, MORPHEUS_MODULE_NAMESPACE) def dfp_inference_pipe(builder: mrc.Builder): # Setup and config parsing ... # Make an edge between the modules. builder.make_edge(preproc_module.output_port("output"), dfp_rolling_window_module.input_port("input")) builder.make_edge(dfp_rolling_window_module.output_port("output"), dfp_data_prep_module.input_port("input")) builder.make_edge(dfp_data_prep_module.output_port("output"), dfp_inference_module.input_port("input")) builder.make_edge(dfp_inference_module.output_port("output"), filter_detections_module.input_port("input")) builder.make_edge(filter_detections_module.output_port("output"), dfp_post_proc_module.input_port("input")) builder.make_edge(dfp_post_proc_module.output_port("output"), serialize_module.input_port("input")) builder.make_edge(serialize_module.output_port("output"), write_to_file_module.input_port("input")) # Register input and output port for a module. builder.register_module_input("input", preproc_module.input_port("input")) builder.register_module_output("output", write_to_file_module.output_port("output"))
The dfp_inference module function creates an inference module that retrieves trained models and performs inference on the input data. The module requires a model_name_formatter and a fallback_username to be configured in its parameters.
The function defines a get_model method to load the model for a specific user, and a process_task method to handle individual inference tasks. The process_task method retrieves the user ID, extracts the payload, and converts the DataFrame to pandas format. It then attempts to load the model for the specified user ID and perform inference using the loaded model. Finally, it adds any additional columns from the input data to the results DataFrame and creates an output message with the results and metadata.
For a complete reference, see: DFP Inference
@register_module(DFP_INFERENCE, MORPHEUS_MODULE_NAMESPACE) def dfp_inference(builder: mrc.Builder): # Setup and config parsing ...
The filter_detections module function is designed to filter rows from a DataFrame based on values in a tensor or DataFrame column according to a specified threshold. Rows are excluded if their associated value in the specified field is less than or equal to the threshold.
This module can operate in two modes, set by the copy argument. When copy=True, rows that meet the filter criteria are copied into a new DataFrame. When copy=False, sliced views are used instead.
The function defines the find_detections method to determine the filter source and identify the rows that match the filter criteria. The filter_copy and filter_slice methods are responsible for handling the filtering process based on the chosen mode.
@register_module(FILTER_DETECTIONS, MORPHEUS_MODULE_NAMESPACE) def filter_detections(builder: mrc.Builder): # Setup and config parsing ...
The dfp_postprocessing module function performs post-processing tasks on the input data.
@register_module(DFP_POST_PROCESSING, MORPHEUS_MODULE_NAMESPACE) def dfp_postprocessing(builder: mrc.Builder): # Setup and config parsing ...
The serialize module function is responsible for filtering columns from a MultiMessage object and emitting a MessageMeta object.
The convert_to_df function converts a dataframe to JSON lines. It takes a MultiMessage instance, include_columns (a pattern for columns to include), exclude_columns (a list of patterns for columns to exclude), and columns (a list of columns to include). The function filters the columns of the input dataframe based on the include and exclude patterns and retrieves the metadata of the filtered columns.
The module function compiles the include and exclude patterns into regular expressions. It then creates a node using the convert_to_df function with the compiled include and exclude patterns and the specified columns.
@register_module(SERIALIZE, MORPHEUS_MODULE_NAMESPACE) def serialize(builder: mrc.Builder): # Setup and config parsing ...
The write_to_file module function writes all messages to a file.
The convert_to_strings function takes a DataFrame (either pandas or cuDF) and converts it into the appropriate string format based on the file type (JSON or CSV). It checks whether to include the index column or not.
@register_module(WRITE_TO_FILE, MORPHEUS_MODULE_NAMESPACE) def write_to_file(builder: mrc.Builder): # Setup and config parsing ...