Simple Python Stage
Morpheus makes use of the MRC graph-execution framework. Morpheus pipelines are built on top of MRC pipelines, which are comprised of collections of nodes and edges, called segments (think sub-graphs), which can in turn be connected by ingress/egress ports. In many common cases, an MRC pipeline will consist of only a single segment. Our Morpheus stages interact with the MRC segment to define, build, and add nodes to the MRC graph; the stages themselves can be thought of as packaged units of work to be applied to data flowing through the pipeline. These work units comprising an individual Morpheus stage may consist of a single MRC node, a small collection of nodes, or an entire MRC subgraph.
To start, we will implement a single stage that could be included in a pipeline. For illustration, this stage will do nothing but take the input from the previous stage and forward it to the next stage. All Morpheus stages have several things in common, so while this doesn’t do too much, it ends up being a good starting point for writing a new stage. From there, we can add our functionality as needed.
Defining this stage requires us to specify the stage type. Morpheus stages contain a single input and a single output inherited from SinglePortStage
. Stages that act as sources of data, in that they do not take an input from a prior stage but rather produce data from a source such as a file, Kafka service, or other external sources, will need to inherit from the SingleOutputSource
base class.
Optionally, stages can be registered as a command with the Morpheus CLI using the register_stage
decorator. This allows for pipelines to be constructed from both pre-built stages and custom user stages via the command line. Any constructor arguments will be introspected using numpydoc and exposed as command line flags. Similarly the class’s docstrings will be exposed in the help string of the stage on the command line.
We start our class definition with a few basic imports:
import typing
import mrc
from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
There are four methods that need to be defined in our new subclass to implement the stage interface: name
, accepted_types
, supports_cpp_node
, and _build_single
. In practice, it is often necessary to define at least one more method which will perform the actual work of the stage; by convention, this method is typically named on_data
, which we will define in our examples.
name
is a property method; it should return a user-friendly name for the stage. Currently, this property is only used for debugging purposes, and there are no requirements on the content or format of the name. However by convention the string returned by this method should be the same as the string passed to the register_stage
decorator.
@property
def name(self) -> str:
return "pass-thru"
The accepted_types
method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as MessageMeta
or MultiMessage
(refer to the message classes defined in morpheus.pipeline.messages
for a complete list).
def accepted_types(self) -> typing.Tuple:
return (typing.Any,)
The supports_cpp_node
method returns a boolean indicating if the stage has a C++ implementation. Since our example only contains a Python implementation we will return False
here.
def supports_cpp_node(self) -> bool:
return False
Our on_data
method accepts an incoming message and returns a message. The returned message can be the same message instance that we received as our input or it could be a new message instance. The method is named on_data
by convention; however, it is not part of the API. In the next section, we will register it as a callback in Morpheus.
def on_data(self, message: typing.Any):
# Return the message for the next stage
return message
Finally, the _build_single
method will be used at stage build time to construct our node and wire it into the pipeline. _build_single
receives an instance of an MRC segment builder (mrc.Builder
) along with a StreamPair
instance, which is a tuple consisting of our parent node and its output type. We will be using the builder instance to construct a node from our stage and connecting it to the Morpheus pipeline. The return type of _build_single
is also a StreamPair
which will be comprised of our node along with its data type.
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
builder.make_edge(input_stream[0], node)
return node, input_stream[1]
For our purposes, a Morpheus stage defines the input data type the stage will accept, the unit of work to be performed on that data, and the output data type. In contrast each individual node or nodes comprising a stage’s unit of work are wired into the underlying MRC execution graph. To build the node, we will call the make_node
method of the builder instance, passing it our unique_name
and on_data
methods. We used the unique_name
property, which will take the name
property which we already defined and append a unique id to it.
node = builder.make_node(self.unique_name, self.on_data)
Next, we will define an edge connecting our new node to our parent node:
builder.make_edge(input_stream[0], node)
Finally, we will return a new tuple of our node and output type. Since this is a pass through node that can accept any input type, we will return our parent’s type.
return node, input_stream[1]
import typing
import mrc
from morpheus.cli.register_stage import register_stage
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stream_pair import StreamPair
@register_stage("pass-thru")
class PassThruStage(SinglePortStage):
"""
A Simple Pass Through Stage
"""
@property
def name(self) -> str:
return "pass-thru"
def accepted_types(self) -> typing.Tuple:
return (typing.Any, )
def supports_cpp_node(self) -> bool:
return False
def on_data(self, message: typing.Any):
# Return the message for the next stage
return message
def _build_single(self, builder: mrc.Builder, input_stream: StreamPair) -> StreamPair:
node = builder.make_node(self.unique_name, self.on_data)
builder.make_edge(input_stream[0], node)
return node, input_stream[1]
To start testing our new pass through stage, we are going to construct a simple pipeline and add our new stage to it. This pipeline will do the minimum work necessary to verify our pass through stage. Data will flow through our simple pipeline as follows:
A source stage will produce data and emit it into the pipeline.
This data will be read and processed by our pass through stage, in this case simply forwarding on the data.
A monitoring stage will record the messages from our pass through stage and terminate the pipeline.
First we will need to import a few things from Morpheus for this example to work. Note that this test script, which we will name “run_passthru.py”, assumes that we saved the code for the PassThruStage in a file named “pass_thru.py” in the same directory.
import logging
import os
from pass_thru import PassThruStage
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging
Before constructing the pipeline, we need to do a bit of environment configuration, starting with the Morpheus logger:
configure_logging(log_level=logging.DEBUG)
Next, we will build a Morpheus Config
object. We will cover setting some common configuration parameters in the next guide. For now, it is important to know that we will always need to build a Config
object:
config = Config()
In this example, we will use the FileSourceStage
class to read a large file in which each line is a JSON object that represents an email message. The stage will take these lines and package them as Morpheus message objects for our pass through stage to consume. Let’s setup our source stage:
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
Next, we will add our new stage to the pipeline as well as a MonitorStage
which will measure the throughput of our pass through stage:
pipeline.add_stage(PassThruStage(config))
pipeline.add_stage(MonitorStage(config))
Finally, we run the pipeline:
pipeline.run()
The output should display:
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Pipeline====
Added source: <from-file-0; FileSourceStage(filename=examples/data/email_with_addresses.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True)>
└─> morpheus.MessageMeta
Added stage: <pass-thru-1; PassThruStage(args=(), kwargs={})>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <monitor-2; MonitorStage(description=Progress, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Pipeline Complete!====
Starting! Time: 1648834587.3092508
====Pipeline Started====
Progress[Complete]: 25229messages [00:00, 57695.02messages/s]
====Pipeline Complete====
Note that this code assumes the MORPHEUS_ROOT
environment variable is set to the root of the Morpheus project repository:
import logging
import os
from morpheus.config import Config
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.utils.logger import configure_logging
from pass_thru import PassThruStage
def run_pipeline():
# Enable the Morpheus logger
configure_logging(log_level=logging.DEBUG)
root_dir = os.environ['MORPHEUS_ROOT']
input_file = os.path.join(root_dir, 'examples/data/email_with_addresses.jsonlines')
config = Config()
# Create a linear pipeline object
pipeline = LinearPipeline(config)
# Set source stage
pipeline.set_source(FileSourceStage(config, filename=input_file, iterative=False))
# Add our own stage
pipeline.add_stage(PassThruStage(config))
# Add monitor to record the performance of our new stage
pipeline.add_stage(MonitorStage(config))
# Run the pipeline
pipeline.run()
if __name__ == "__main__":
run_pipeline()
Alternate Morpheus CLI example
The above example makes use of the Morpheus Python API, alternately we could have constructed the same pipeline using the Morpheus command line tool. We will need to pass in the path to our stage via the --plugin
argument so that it will be visible to the command line tool.
From the root of the Morpheus repo run:
morpheus --log_level=debug --plugin examples/developer_guide/1_simple_python_stage/pass_thru.py \
run pipeline-other \
from-file --filename=examples/data/email_with_addresses.jsonlines \
pass-thru \
monitor