morpheus.pipeline.pipeline.Pipeline

class Pipeline(config)[source]

Bases: object

Class for building your pipeline. A pipeline for your use case can be constructed by first adding a Source via set_source then any number of downstream Stage classes via add_stage. The order stages are added with add_stage determines the order in which stage executions are carried out. You can use stages included within Morpheus or your own custom-built stages.

Parameters
config : morpheus.config.Config

Pipeline configuration instance.

Attributes
state

Methods

add_edge(start, end[, segment_id]) Create an edge between two stages and add it to a segment in the pipeline.
add_segment_edge(egress_stage, ...) Create an edge between two segments in the pipeline.
add_stage(stage[, segment_id]) Add a stage to a segment in the pipeline.
build() This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline.
join() Wait until pipeline completes upon which join methods of sources and stages will be called.
run() This function makes use of asyncio features to keep the pipeline running indefinitely.
run_async() This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.
stop() Stops all running stages and the underlying MRC pipeline.
visualize([filename]) Output a pipeline diagram to filename.
build_and_start
add_edge(start, end, segment_id='main')[source]

Create an edge between two stages and add it to a segment in the pipeline. When start and end are stages, they must have exactly one output and input port respectively.

Parameters
start

The start of the edge or parent stage.

end

The end of the edge or child stage.

segment_id

ID indicating what segment the edge should be added to.

add_segment_edge(egress_stage, egress_segment, ingress_stage, ingress_segment, port_pair)[source]

Create an edge between two segments in the pipeline.

Parameters
egress_stage

The egress stage of the parent segment

egress_segment

Segment ID of the parent segment

ingress_stage

The ingress stage of the child segment

ingress_segment

Segment ID of the child segment

port_pair
Either the ID of the egress segment, or a tuple with the following three elements:
  • str: ID of the egress segment

  • class: type being sent (typically object)

  • bool: If the type is a shared pointer (typically should be False)

add_stage(stage, segment_id='main')[source]

Add a stage to a segment in the pipeline.

Parameters
stage

The stage object to add. It cannot be already added to another Pipeline object.

segment_id

ID indicating what segment the stage should be added to.

build()[source]

This function sequentially activates all the Morpheus pipeline stages passed by the users to execute a pipeline. For the Source and all added Stage objects, StageBase.build will be called sequentially to construct the pipeline.

Once the pipeline has been constructed, this will start the pipeline by calling Source.start on the source object.

async join()[source]

Wait until pipeline completes upon which join methods of sources and stages will be called.

run()[source]

This function makes use of asyncio features to keep the pipeline running indefinitely.

async run_async()[source]

This function sets up the current asyncio loop, builds the pipeline, and awaits on it to complete.

stop()[source]

Stops all running stages and the underlying MRC pipeline.

visualize(filename=None, **graph_kwargs)[source]

Output a pipeline diagram to filename. The file format of the diagrame is inferred by the extension of filename. If the directory path leading to filename does not exist it will be created, if filename already exists it will be overwritten. Requires the graphviz library.

Previous morpheus.pipeline.pipeline
Next morpheus.pipeline.pipeline.PipelineState
© Copyright 2024, NVIDIA. Last updated on Apr 11, 2024.