morpheus.pipeline.pipeline.Pipeline
- class Pipeline(c)[source]
Bases:
object
Class for building your pipeline. A pipeline for your use case can be constructed by first adding a
Source
viaset_source
then any number of downstreamStage
classes viaadd_stage
. The order stages are added withadd_stage
determines the order in which stage executions are carried out. You can use stages included within Morpheus or your own custom-built stages.- Parameters
- c
morpheus.config.Config
Pipeline configuration instance.
- c
- Attributes
- is_built
Methods
add_edge
(start, end[, segment_id])Create an edge between two stages and add it to a segment in the pipeline.
add_segment_edge
(egress_stage, ...)Create an edge between two segments in the pipeline.
add_stage
(stage[, segment_id])Add a stage to a segment in the pipeline.
build
()This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.
join
()Suspend execution all currently running stages and the MRC pipeline.
run
()This function makes use of asyncio features to keep the pipeline running indefinitely.
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
stop
()Stops all running stages and the underlying MRC pipeline.
visualize
([filename])Output a pipeline diagram to
filename
.- add_edge(start, end, segment_id='main')[source]
Create an edge between two stages and add it to a segment in the pipeline.
- Parameters
- starttyping.Union[StreamWrapper, Sender]
- endtyping.Union[Stage, Receiver]
- segment_idstr
The start of the edge or parent stage.
The end of the edge or child stage.
ID indicating what segment the edge should be added to.
- add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]
Create an edge between two segments in the pipeline.
- Parameters
- egress_stageStage
- egress_segmentstr
- ingress_stageStage
- ingress_segmentstr
- port_pairtyping.Union[str, typing.Tuple]
The egress stage of the parent segment
Segment ID of the parent segment
The ingress stage of the child segment
Segment ID of the child segment
- add_stage(stage, segment_id='main')[source]
Add a stage to a segment in the pipeline.
- Parameters
- stageStage
- segment_idstr
The stage object to add. It cannot be already added to another
Pipeline
object.ID indicating what segment the stage should be added to.
- build()[source]
This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the
Source
and all addedStage
objects,StreamWrapper.build
will be called sequentially to construct the pipeline.Once the pipeline has been constructed, this will start the pipeline by calling
Source.start
on the source object.- async join()[source]
Suspend execution all currently running stages and the MRC pipeline. Typically called after
stop
.- run()[source]
This function makes use of asyncio features to keep the pipeline running indefinitely.
- async run_async()[source]
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
- stop()[source]
Stops all running stages and the underlying MRC pipeline.
- visualize(filename=None, **graph_kwargs)[source]
Output a pipeline diagram to
filename
. The file format of the diagrame is inferred by the extension offilename
. If the directory path leading tofilename
does not exist it will be created, iffilename
already exists it will be overwritten. Requires the graphviz library.