Simple Python Stage

Morpheus makes use of the MRC graph-execution framework. Morpheus pipelines are built on top of MRC pipelines, which are comprised of collections of nodes and edges, called segments (think sub-graphs), which can in turn be connected by ingress/egress ports. In many common cases, an MRC pipeline will consist of only a single segment. Our Morpheus stages interact with the MRC segment to define, build, and add nodes to the MRC graph; the stages themselves can be thought of as packaged units of work to be applied to data flowing through the pipeline. These work units comprising an individual Morpheus stage may consist of a single MRC node, a small collection of nodes, or an entire MRC subgraph.

To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn’t do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed.

Defining this stage requires us to specify the stage type. Morpheus stages contain a single input and a single output inherited from SinglePortStage. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the SingleOutputSource base class.

Optionally, stages can be registered as a command with the Morpheus CLI using the register_stage decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using numpydoc and exposed as command line flags. Similarly the class’s docstrings will be exposed in the help string of the stage on the command line.

We start our class definition with a few basic imports:

Copy
Copied!
            

import typing import mrc from morpheus.cli.register_stage import register_stage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair @register_stage("pass-thru") class PassThruStage(SinglePortStage):

There are four methods that need to be defined in our new subclass to implement the stage interface: name, accepted_types, supports_cpp_node, and _build_single. In practice, it is often necessary to define at least one more method which will perform the actual work of the stage; by convention, this method is typically named on_data, which we will define in our examples.

name is a property method; it should return a user-friendly name for the stage. Currently, this property is only used for debugging purposes, and there are no requirements on the content or format of the name. However by convention the string returned by this method should be the same as the string passed to the register_stage decorator.

Copy
Copied!
            

@property def name(self) -> str: return "pass-thru"

The accepted_types method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as MessageMeta or MultiMessage (refer to the message classes defined in morpheus.pipeline.messages for a complete list).

Copy
Copied!
            

def accepted_types(self) -> typing.Tuple: return (typing.Any,)

The supports_cpp_node method returns a boolean indicating if the stage has a C++ implementation. Since our example only contains a Python implementation we will return False here.

Copy
Copied!
            

def supports_cpp_node(self) -> bool: return False

Our on_data method accepts an incoming message and returns a message. The returned message can be the same message instance that we received as our input or it could be a new message instance. The method is named on_data by convention; however, it is not part of the API. In the next section, we will register it as a callback in Morpheus.

Copy
Copied!
            

def on_data(self, message: typing.Any): # Return the message for the next stage return message

Finally, the _build_single method will be used at stage build time to construct our node and wire it into the pipeline. _build_single receives an instance of an MRC segment builder (mrc.Builder) along with a StreamPair instance, which is a tuple consisting of our parent node and its output type. We will be using the builder instance to construct a node from our stage and connecting it to the Morpheus pipeline. The return type of _build_single is also a StreamPair which will be comprised of our node along with its data type.

Copy
Copied!
            

def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: node = builder.make_node(self.unique_name, self.on_data) builder.make_edge(input_stream[0], node) return node, input_stream[1]

For our purposes, a Morpheus stage defines the input data type the stage will accept, the unit of work to be performed on that data, and the output data type. In contrast each individual node or nodes comprising a stage’s unit of work are wired into the underlying MRC execution graph. To build the node, we will call the make_node method of the builder instance, passing it our unique_name and on_data methods. We used the unique_name property, which will take the name property which we already defined and append a unique id to it.

Copy
Copied!
            

node = builder.make_node(self.unique_name, self.on_data)

Next, we will define an edge connecting our new node to our parent node:

Copy
Copied!
            

builder.make_edge(input_stream[0], node)

Finally, we will return a new tuple of our node and output type. Since this is a pass through node that can accept any input type, we will return our parent’s type.

Copy
Copied!
            

return node, input_stream[1]

Copy
Copied!
            

import typing import mrc from morpheus.cli.register_stage import register_stage from morpheus.pipeline.single_port_stage import SinglePortStage from morpheus.pipeline.stream_pair import StreamPair @register_stage("pass-thru") class PassThruStage(SinglePortStage): """ A Simple Pass Through Stage """ @property def name(self) -> str: return "pass-thru" def accepted_types(self) -> typing.Tuple: return (typing.Any, ) def supports_cpp_node(self) -> bool: return False def on_data(self, message: typing.Any): # Return the message for the next stage return message def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair: node = builder.make_node(self.unique_name, self.on_data) builder.make_edge(input_stream[0], node) return node, input_stream[1]

To start testing our new pass through stage, we are going to construct a simple pipeline and add our new stage to it. This pipeline will do the minimum work necessary to verify our pass through stage. Data will flow through our simple pipeline as follows:

  1. A source stage will produce data and emit it into the pipeline.

  2. This data will be read and processed by our pass through stage, in this case simply forwarding on the data.

  3. A monitoring stage will record the messages from our pass through stage and terminate the pipeline.

First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name “run_passthru.py”, assumes that we saved the code for the PassThruStage in a file named “pass_thru.py” in the same directory.

Copy
Copied!
            

import logging import os from pass_thru import PassThruStage from morpheus.config import Config from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.utils.logger import configure_logging

Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger:

Copy
Copied!
            

configure_logging(log_level=logging.DEBUG)

Next, we will build a Morpheus Config object. We will cover setting some common configuration parameters in the next guide. For now, it is important to know that we will always need to build a Config object:

Copy
Copied!
            

config = Config()

In this example, we will use the FileSourceStage class to read a large file in which each line is a JSON object that represents an email message. The stage will take these lines and package them as Morpheus message objects for our pass through stage to consume. Let’s setup our source stage:

Copy
Copied!
            

pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))

Next, we will add our new stage to the pipeline as well as a MonitorStage which will measure the throughput of our pass through stage:

Copy
Copied!
            

pipeline.add_stage(PassThruStage(config)) pipeline.add_stage(MonitorStage(config))

Finally, we run the pipeline:

Copy
Copied!
            

pipeline.run()

The output should display:

Copy
Copied!
            

====Registering Pipeline==== ====Registering Pipeline Complete!==== ====Starting Pipeline==== ====Building Pipeline==== Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True)> └─> morpheus.MessageMeta Added stage: <pass-thru-1; PassThruStage(args=(), kwargs={})> └─ morpheus.MessageMeta -> morpheus.MessageMeta Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)> └─ morpheus.MessageMeta -> morpheus.MessageMeta ====Building Pipeline Complete!==== Starting! Time: 1648834587.3092508 ====Pipeline Started==== Progress[Complete]: 25229messages [00:00, 57695.02messages/s] ====Pipeline Complete====

Note that this code assumes the MORPHEUS_ROOT environment variable is set to the root of the Morpheus project repository:

Copy
Copied!
            

import logging import os from morpheus.config import Config from morpheus.pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.file_source_stage import FileSourceStage from morpheus.utils.logger import configure_logging from pass_thru import PassThruStage def run_pipeline(): # Enable the Morpheus logger configure_logging(log_level=logging.DEBUG) root_dir = os.environ['MORPHEUS_ROOT'] input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines') config = Config() # Create a linear pipeline object pipeline = LinearPipeline(config) # Set source stage pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False)) # Add our own stage pipeline.add_stage(PassThruStage(config)) # Add monitor to record the performance of our new stage pipeline.add_stage(MonitorStage(config)) # Run the pipeline pipeline.run() if __name__ == "__main__": run_pipeline()

Alternate Morpheus CLI example

The above example makes use of the Morpheus Python API, alternately we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the --plugin argument so that it will be visible to the command line tool.

From the root of the Morpheus repo run:

Copy
Copied!
            

morpheus --log_level=debug --plugin examples/developer_guide/1_simple_python_stage/pass_thru.py \ run pipeline-other \ from-file --filename=examples/data/email_with_addresses.jsonlines \ pass-thru \ monitor

© Copyright 2023, NVIDIA. Last updated on Apr 11, 2023.