morpheus.pipeline.linear_pipeline.LinearPipeline
- class LinearPipeline(c)[source]
Bases:
morpheus.pipeline.pipeline.Pipeline
This class is used to build linear pipelines where we have a single output source stage followed by stages that are executed sequentially in the order they were added.
- Parameters
- c
morpheus.config.Config
Pipeline configuration instance.
- c
- Attributes
- is_built
Methods
add_edge
(start, end[, segment_id])Create an edge between two stages and add it to a segment in the pipeline.
add_segment_boundary
([data_type, ...])- Parameters
add_segment_edge
(egress_stage, ...)Create an edge between two segments in the pipeline.
add_stage
(stage)Add a stage to the pipeline.
build
()This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.
join
()Suspend execution all currently running stages and the MRC pipeline.
run
()This function makes use of asyncio features to keep the pipeline running indefinitely.
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
set_source
(source)Set a pipeline's source stage to consume messages before it begins executing stages.
stop
()Stops all running stages and the underlying MRC pipeline.
visualize
([filename])Output a pipeline diagram to
filename
.- add_edge(start, end, segment_id='main')[source]
Create an edge between two stages and add it to a segment in the pipeline.
- Parameters
- starttyping.Union[StreamWrapper, Sender]
- endtyping.Union[Stage, Receiver]
- segment_idstr
The start of the edge or parent stage.
The end of the edge or child stage.
ID indicating what segment the edge should be added to.
- add_segment_boundary(data_type=None, as_shared_pointer=False)[source]
- Parameters
- data_type
data_type
- as_shared_pointer
boolean
data type that will be passed across the segment boundary, defaults to a general python object if ‘data_type’ has no registered edge adapters.
Whether the data type will be wrapped in a shared pointer.
- data_type
Examples
>>> # Create a config and pipeline >>> config = Config() >>> pipe = LinearPipeline(config) >>> >>> # Add a source in Segment #1 >>> pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) >>> >>> # Add a segment boundary >>> # [Current Segment] - [Egress Boundary] ---- [Ingress Boundary] - [Next Segment] >>> pipe.add_segment_boundary(MessageMeta) >>> >>> # Add a sink in Segment #2 >>> pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) >>> >>> pipe.run()
- add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]
Create an edge between two segments in the pipeline.
- Parameters
- egress_stageStage
- egress_segmentstr
- ingress_stageStage
- ingress_segmentstr
- port_pairtyping.Union[str, typing.Tuple]
The egress stage of the parent segment
Segment ID of the parent segment
The ingress stage of the child segment
Segment ID of the child segment
- add_stage(stage)[source]
Add a stage to the pipeline. All
Stage
classes added with this method will be executed sequentially inthe order they were added.- Parameters
- stage
Stage
The stage object to add. It cannot be already added to another
Pipeline
object.- stage
- build()[source]
This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the
Source
and all addedStage
objects,StreamWrapper.build
will be called sequentially to construct the pipeline.Once the pipeline has been constructed, this will start the pipeline by calling
Source.start
on the source object.- async join()[source]
Suspend execution all currently running stages and the MRC pipeline. Typically called after
stop
.- run()[source]
This function makes use of asyncio features to keep the pipeline running indefinitely.
- async run_async()[source]
This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
- set_source(source)[source]
Set a pipeline’s source stage to consume messages before it begins executing stages. This must be called once before calling
run
orrun_async
.- Parameters
- source
SourceStage
The source stage wraps the implementation in a stream that allows it to read from Kafka or a file.
- source
- stop()[source]
Stops all running stages and the underlying MRC pipeline.
- visualize(filename=None, **graph_kwargs)[source]
Output a pipeline diagram to
filename
. The file format of the diagrame is inferred by the extension offilename
. If the directory path leading tofilename
does not exist it will be created, iffilename
already exists it will be overwritten. Requires the graphviz library.