NVIDIA Morpheus (24.10.01)
(Latest Version)

morpheus.pipeline.linear_pipeline.LinearPipeline

class LinearPipeline(c)[source]

Bases: <a href="morpheus.pipeline.pipeline.Pipeline.html#morpheus.pipeline.pipeline.Pipeline">morpheus.pipeline.pipeline.Pipeline</a>

This class is used to build linear pipelines where we have a single output source stage followed by stages that are executed sequentially in the order they were added.

Parameters
c<a href="morpheus.config.Config.html#morpheus.config.Config">morpheus.config.Config</a>

Pipeline configuration instance.

Attributes
state

Methods

<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_edge">add_edge</a>(start, end[, segment_id]) Create an edge between two stages and add it to a segment in the pipeline.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_segment_boundary">add_segment_boundary</a>([data_type, ...])

Parameters

<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_segment_edge">add_segment_edge</a>(egress_stage, ...) Create an edge between two segments in the pipeline.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.add_stage">add_stage</a>(stage) Add a stage to the pipeline.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.build">build</a>() This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.join">join</a>() Wait until pipeline completes upon which join methods of sources and stages will be called.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run">run</a>() This function makes use of asyncio features to keep the pipeline running indefinitely.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run_async">run_async</a>() This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.set_source">set_source</a>(source) Set a pipeline's source stage to consume messages before it begins executing stages.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.stop">stop</a>() Stops all running stages and the underlying MRC pipeline.
<a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.visualize">visualize</a>([filename]) Output a pipeline diagram to filename.
build_and_start
add_edge(start, end, segment_id='main')[source]

Create an edge between two stages and add it to a segment in the pipeline. When start and end are stages, they must have exactly one output and input port respectively.

Parameters
starttyping.Union[StageBase, Sender]

The start of the edge or parent stage.

endtyping.Union[Stage, Receiver]

The end of the edge or child stage.

segment_idstr

ID indicating what segment the edge should be added to.

add_segment_boundary(data_type=None, as_shared_pointer=False)[source]
Parameters
data_typedata_type

data type that will be passed across the segment boundary, defaults to a general python object if ‘data_type’ has no registered edge adapters.

as_shared_pointerboolean

Whether the data type will be wrapped in a shared pointer. Currently this is not implemented.

Examples

Copy
Copied!
            

>>> # Create a config and pipeline >>> config = Config() >>> pipe = LinearPipeline(config) >>> >>> # Add a source in Segment #1 >>> pipe.set_source(FileSourceStage(config, filename=val_file_name, iterative=False)) >>> >>> # Add a segment boundary >>> # [Current Segment] - [Egress Boundary] ---- [Ingress Boundary] - [Next Segment] >>> pipe.add_segment_boundary(MessageMeta) >>> >>> # Add a sink in Segment #2 >>> pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False)) >>> >>> pipe.run()

add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]

Create an edge between two segments in the pipeline.

Parameters
egress_stageStage

The egress stage of the parent segment

egress_segmentstr

Segment ID of the parent segment

ingress_stageStage

The ingress stage of the child segment

ingress_segmentstr

Segment ID of the child segment

port_pairtyping.Union[str, typing.Tuple]
Either the ID of the egress segment, or a tuple with the following three elements:
  • str: ID of the egress segment

  • class: type being sent (typically <a href="https://docs.python.org/3/library/functions.html#object">object</a>)

  • bool: If the type is a shared pointer (typically should be <a href="https://docs.python.org/3/library/constants.html#False">False</a>)

add_stage(stage)[source]

Add a stage to the pipeline. All Stage classes added with this method will be executed sequentially inthe order they were added.

Parameters
stageStage

The stage object to add. It cannot be already added to another Pipeline object.

build()[source]

This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the Source and all added Stage objects, StageBase.build will be called sequentially to construct the pipeline.

Once the pipeline has been constructed, this will start the pipeline by calling Source.start on the source object.

async join()[source]

Wait until pipeline completes upon which join methods of sources and stages will be called.

run()[source]

This function makes use of asyncio features to keep the pipeline running indefinitely.

async run_async()[source]

This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.

set_source(source)[source]

Set a pipeline’s source stage to consume messages before it begins executing stages. This must be called once before calling <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run">run</a> or <a href="#morpheus.pipeline.linear_pipeline.LinearPipeline.run_async">run_async</a>.

Parameters
sourceSourceStage

The source stage wraps the implementation in a stream that allows it to read from Kafka or a file.

stop()[source]

Stops all running stages and the underlying MRC pipeline.

visualize(filename=None, **graph_kwargs)[source]

Output a pipeline diagram to filename. The file format of the diagrame is inferred by the extension of filename. If the directory path leading to filename does not exist it will be created, if filename already exists it will be overwritten. Requires the graphviz library.

Previous morpheus.pipeline.linear_pipeline
Next morpheus.pipeline.pass_thru_type_mixin
© Copyright 2024, NVIDIA. Last updated on Dec 3, 2024.