- class Pipeline(c)[source]
Bases:
object
Class for building your pipeline. A pipeline for your use case can be constructed by first adding a
Source
viaset_source
then any number of downstreamStage
classes viaadd_stage
. The order stages are added withadd_stage
determines the order in which stage executions are carried out. You can use stages included within Morpheus or your own custom-built stages.- Parameters
- c<bsp-code-inline code="<a href="morpheus.config.Config.html#morpheus.config.Config">morpheus.config.Config</a>"><a href="morpheus.config.Config.html#morpheus.config.Config">morpheus.config.Config</a></bsp-code-inline>
Pipeline configuration instance.
- Attributes
- is_built
Methods
add_edge
(start, end[, segment_id])Create an edge between two stages and add it to a segment in the pipeline.
add_segment_edge
(egress_stage, ...)Create an edge between two segments in the pipeline.
add_stage
(stage[, segment_id])Add a stage to a segment in the pipeline.
build
()This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.
join
()Suspend execution all currently running stages and the MRC pipeline.
run
()This function makes use of asyncio features to keep the pipeline running indefinitely.
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
stop
()Stops all running stages and the underlying MRC pipeline.
visualize
([filename])Output a pipeline diagram to
filename
.- add_edge(start, end, segment_id='main')[source]
Create an edge between two stages and add it to a segment in the pipeline.
- Parameters
- starttyping.Union[StreamWrapper, Sender]
The start of the edge or parent stage.
- endtyping.Union[Stage, Receiver]
The end of the edge or child stage.
- segment_idstr
ID indicating what segment the edge should be added to.
- add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]
Create an edge between two segments in the pipeline.
- Parameters
- egress_stageStage
The egress stage of the parent segment
- egress_segmentstr
Segment ID of the parent segment
- ingress_stageStage
The ingress stage of the child segment
- ingress_segmentstr
Segment ID of the child segment
- port_pairtyping.Union[str, typing.Tuple]
- add_stage(stage, segment_id='main')[source]
Add a stage to a segment in the pipeline.
- Parameters
- stageStage
The stage object to add. It cannot be already added to another
Pipeline
object.- segment_idstr
ID indicating what segment the stage should be added to.
- build()[source]
This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the
Source
and all addedStage
objects,StreamWrapper.build
will be called sequentially to construct the pipeline.Once the pipeline has been constructed, this will start the pipeline by calling
Source.start
on the source object.
- async join()[source]
Suspend execution all currently running stages and the MRC pipeline. Typically called after
stop
.
- run()[source]
This function makes use of asyncio features to keep the pipeline running indefinitely.
- async run_async()[source]
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
- stop()[source]
Stops all running stages and the underlying MRC pipeline.
- visualize(filename=None, **graph_kwargs)[source]
Output a pipeline diagram to
filename
. The file format of the diagrame is inferred by the extension offilename
. If the directory path leading tofilename
does not exist it will be created, iffilename
already exists it will be overwritten. Requires the graphviz library.