morpheus.pipeline.pipeline.Pipeline

class Pipeline(c)[source]

Bases: object

Class for building your pipeline. A pipeline for your use case can be constructed by first adding a Source via set_source then any number of downstream Stage classes via add_stage. The order stages are added with add_stage determines the order in which stage executions are carried out. You can use stages included within Morpheus or your own custom-built stages.

Parameters
cmorpheus.config.Config

Pipeline configuration instance.

Attributes
is_built

Methods

add_edge(start, end[, segment_id])

Create an edge between two stages and add it to a segment in the pipeline.

add_segment_edge(egress_stage, ...)

Create an edge between two segments in the pipeline.

add_stage(stage[, segment_id])

Add a stage to a segment in the pipeline.

build()

This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.

join()

Suspend execution all currently running stages and the MRC pipeline.

run()

This function makes use of asyncio features to keep the pipeline running indefinitely.

run_async()

This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.

stop()

Stops all running stages and the underlying MRC pipeline.

visualize([filename])

Output a pipeline diagram to filename.

add_edge(start, end, segment_id='main')[source]

Create an edge between two stages and add it to a segment in the pipeline.

Parameters
starttyping.Union[StreamWrapper, Sender]

The start of the edge or parent stage.

endtyping.Union[Stage, Receiver]

The end of the edge or child stage.

segment_idstr

ID indicating what segment the edge should be added to.

add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]

Create an edge between two segments in the pipeline.

Parameters
egress_stageStage

The egress stage of the parent segment

egress_segmentstr

Segment ID of the parent segment

ingress_stageStage

The ingress stage of the child segment

ingress_segmentstr

Segment ID of the child segment

port_pairtyping.Union[str, typing.Tuple]
Either the ID of the egress segment, or a tuple with the following three elements:
  • str: ID of the egress segment

  • class: type being sent (typically object)

  • bool: If the type is a shared pointer (typically should be False)

add_stage(stage, segment_id='main')[source]

Add a stage to a segment in the pipeline.

Parameters
stageStage

The stage object to add. It cannot be already added to another Pipeline object.

segment_idstr

ID indicating what segment the stage should be added to.

build()[source]

This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the Source and all added Stage objects, StreamWrapper.build will be called sequentially to construct the pipeline.

Once the pipeline has been constructed, this will start the pipeline by calling Source.start on the source object.

async join()[source]

Suspend execution all currently running stages and the MRC pipeline. Typically called after stop.

run()[source]

This function makes use of asyncio features to keep the pipeline running indefinitely.

async run_async()[source]

This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.

stop()[source]

Stops all running stages and the underlying MRC pipeline.

visualize(filename=None, **graph_kwargs)[source]

Output a pipeline diagram to filename. The file format of the diagrame is inferred by the extension of filename. If the directory path leading to filename does not exist it will be created, if filename already exists it will be overwritten. Requires the graphviz library.

© Copyright 2023, NVIDIA. Last updated on Apr 11, 2023.