morpheus.pipeline.linear_pipeline.LinearPipeline
- class LinearPipeline(c)[source]
Bases:
<a href="morpheus.pipeline.pipeline.Pipeline.html#morpheus.pipeline.pipeline.Pipeline">morpheus.pipeline.pipeline.Pipeline</a>
This class is used to build linear pipelines where we have a single output source stage followed by stages that are executed sequentially in the order they were added.
- Parameters
- c
<a href="morpheus.config.Config.html#morpheus.config.Config">morpheus.config.Config</a>
Pipeline configuration instance.
- c
- Attributes
- state
Methods
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_edge">add_edge</a>
(start, end[, segment_id])Create an edge between two stages and add it to a segment in the pipeline. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_segment_boundary">add_segment_boundary</a>
([data_type, ...])- Parameters
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_segment_edge">add_segment_edge</a>
(egress_stage, ...)Create an edge between two segments in the pipeline. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_stage">add_stage</a>
(stage)Add a stage to the pipeline. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.build">build</a>
()This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.join">join</a>
()Wait until pipeline completes upon which join methods of sources and stages will be called. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run">run</a>
()This function makes use of asyncio features to keep the pipeline running indefinitely. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run_async">run_async</a>
()This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.set_source">set_source</a>
(source)Set a pipeline's source stage to consume messages before it begins executing stages. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.stop">stop</a>
()Stops all running stages and the underlying MRC pipeline. <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.visualize">visualize</a>
([filename])Output a pipeline diagram to filename
.build_and_start - add_edge(start, end, segment_id='main')[source]
Create an edge between two stages and add it to a segment in the pipeline. When
start
andend
are stages, they must have exactly one output and input port respectively.- Parameters
- starttyping.Union[StageBase, Sender]
The start of the edge or parent stage.
- endtyping.Union[Stage, Receiver]
The end of the edge or child stage.
- segment_idstr
ID indicating what segment the edge should be added to.
- add_segment_boundary(data_type=None, as_shared_pointer=False)[source]
- Parameters
- data_type
data_type
data type that will be passed across the segment boundary, defaults to a general python object if ‘data_type’ has no registered edge adapters.
- as_shared_pointer
boolean
Whether the data type will be wrapped in a shared pointer. Currently this is not implemented.
- data_type
Examples
>>> # Create a config and pipeline >>> config = Config() >>> pipe = LinearPipeline(config) >>> >>> # Add a source in Segment #1 >>> pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) >>> >>> # Add a segment boundary >>> # [Current Segment] - [Egress Boundary] ---- [Ingress Boundary] - [Next Segment] >>> pipe.add_segment_boundary(MessageMeta) >>> >>> # Add a sink in Segment #2 >>> pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) >>> >>> pipe.run()
- add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]
Create an edge between two segments in the pipeline.
- Parameters
- egress_stageStage
The egress stage of the parent segment
- egress_segmentstr
Segment ID of the parent segment
- ingress_stageStage
The ingress stage of the child segment
- ingress_segmentstr
Segment ID of the child segment
- port_pairtyping.Union[str, typing.Tuple]
- Either the ID of the egress segment, or a tuple with the following three elements:
str: ID of the egress segment
class: type being sent (typically
<a href="https://docs.python.org/3/library/functions.html#object">object</a>
)bool: If the type is a shared pointer (typically should be
<a href="https://docs.python.org/3/library/constants.html#False">False</a>
)
- add_stage(stage)[source]
Add a stage to the pipeline. All
Stage
classes added with this method will be executed sequentially inthe order they were added.- Parameters
- stage
Stage
The stage object to add. It cannot be already added to another
Pipeline
object.
- stage
- build()[source]
This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the
Source
and all addedStage
objects,StageBase.build
will be called sequentially to construct the pipeline.Once the pipeline has been constructed, this will start the pipeline by calling
Source.start
on the source object.
- async join()[source]
Wait until pipeline completes upon which join methods of sources and stages will be called.
- run()[source]
This function makes use of asyncio features to keep the pipeline running indefinitely.
- async run_async()[source]
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
- set_source(source)[source]
Set a pipeline’s source stage to consume messages before it begins executing stages. This must be called once before calling
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run">run</a>
or<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run_async">run_async</a>
.- Parameters
- source
SourceStage
The source stage wraps the implementation in a stream that allows it to read from Kafka or a file.
- source
- stop()[source]
Stops all running stages and the underlying MRC pipeline.
- visualize(filename=None, **graph_kwargs)[source]
Output a pipeline diagram to
filename
. The file format of the diagrame is inferred by the extension offilename
. If the directory path leading tofilename
does not exist it will be created, iffilename
already exists it will be overwritten. Requires the graphviz library.