morpheus.pipeline.pipeline.Pipeline
- class Pipeline(config)[source]
Bases:
object
Class for building your pipeline. A pipeline for your use case can be constructed by first adding a
Source
viaset_source
then any number of downstreamStage
classes viaadd_stage
. The order stages are added withadd_stage
determines the order in which stage executions are carried out. You can use stages included within Morpheus or your own custom-built stages.- Parameters
- config
morpheus.config.Config
Pipeline configuration instance.
- config
- Attributes
- state
Methods
add_edge
(start, end[, segment_id])Create an edge between two stages and add it to a segment in the pipeline. add_segment_edge
(egress_stage, ...)Create an edge between two segments in the pipeline. add_stage
(stage[, segment_id])Add a stage to a segment in the pipeline. build
()This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. join
()Wait until pipeline completes upon which join methods of sources and stages will be called. run
()This function makes use of asyncio features to keep the pipeline running indefinitely. run_async
()This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete. stop
()Stops all running stages and the underlying MRC pipeline. visualize
([filename])Output a pipeline diagram to filename
.build_and_start - add_edge(start, end, segment_id='main')[source]
Create an edge between two stages and add it to a segment in the pipeline. When
start
andend
are stages, they must have exactly one output and input port respectively.- Parameters
- starttyping.Union[StageBase, Sender]
The start of the edge or parent stage.
- endtyping.Union[Stage, Receiver]
The end of the edge or child stage.
- segment_idstr
ID indicating what segment the edge should be added to.
- add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]
Create an edge between two segments in the pipeline.
- Parameters
- egress_stageStage
The egress stage of the parent segment
- egress_segmentstr
Segment ID of the parent segment
- ingress_stageStage
The ingress stage of the child segment
- ingress_segmentstr
Segment ID of the child segment
- port_pairtyping.Union[str, typing.Tuple]
- add_stage(stage, segment_id='main')[source]
Add a stage to a segment in the pipeline.
- Parameters
- stageStage
The stage object to add. It cannot be already added to another
Pipeline
object.- segment_idstr
ID indicating what segment the stage should be added to.
- build()[source]
This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the
Source
and all addedStage
objects,StageBase.build
will be called sequentially to construct the pipeline.Once the pipeline has been constructed, this will start the pipeline by calling
Source.start
on the source object.
- async join()[source]
Wait until pipeline completes upon which join methods of sources and stages will be called.
- run()[source]
This function makes use of asyncio features to keep the pipeline running indefinitely.
- async run_async()[source]
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
- stop()[source]
Stops all running stages and the underlying MRC pipeline.
- visualize(filename=None, **graph_kwargs)[source]
Output a pipeline diagram to
filename
. The file format of the diagrame is inferred by the extension offilename
. If the directory path leading tofilename
does not exist it will be created, iffilename
already exists it will be overwritten. Requires the graphviz library.