Pipeline

Pipeline class

In DALI, any data processing task has a central object called pipeline. Pipeline object is an instance of nvidia.dali.pipeline.Pipeline or a derived class. Pipeline encapsulates the data processing graph and the execution engine.

There are two ways to define a DALI pipelines:

  1. by inheriting from Pipeline class and overriding Pipeline.define_graph()

  2. by instantiating Pipeline directly, building the graph and setting the pipeline outputs with Pipeline.set_outputs()

Data processing graphs

DALI pipeline is represented as a graph of operations. There are two kinds of nodes in the graph:

  • Operators - created on each call to an operator

  • Data nodes (see DataNode) - represent outputs and inputs of operators; they are returned from calls to operators and passing them as inputs to other operators establishes connections in the graph.

Example:

class MyPipeline(Pipeline):
    def define_graph(self):
        img_reader  = ops.FileReader(file_root = "image_dir", seed = 1)
        mask_reader = ops.FileReader(file_root = "mask_dir", seed = 1)
        img_files, labels = img_reader()  # creates an instance of `FileReader`
        mask_files, _ = mask_reader()     # creates another instance of `FileReader`
        decode = ops.ImageDecoder()
        images = decode(img_files)  # creates an instance of `ImageDecoder`
        masks  = decode(mask_files)   # creates another instance of `ImageDecoder`
        return [images, masks, labels]

pipe = MyPipeline(batch_size = 4, num_threads = 2, device_id = 0)
pipe.build()

The resulting graph is:

_images/two_readers.svg

Current pipeline

Subgraphs that do not contribute to the pipeline output are automatically pruned. If an operator has side effects (e.g. PythonFunction operator family), it cannot be invoked without setting the current pipeline. Current pipeline is set implicitly when the graph is defined inside derived pipeline’s define_graph method. Otherwise, it can be set using context manager (with statement):

pipe = dali.pipeline.Pipeline(batch_size = N, num_threads = 3, device_id = 0)
with pipe:
    src = dali.ops.ExternalSource(my_source, num_outputs = 2)
    a, b = src()
    pipe.set_outputs(a, b)
class nvidia.dali.pipeline.Pipeline(batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=-1, default_cuda_stream_priority=0)

Pipeline class is the base of all DALI data pipelines. The pipeline encapsulates the data processing graph and the execution engine.

Parameters
  • batch_size (int, optional, default = -1) – Batch size of the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead).

  • num_threads (int, optional, default = -1) – Number of CPU threads used by the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead).

  • device_id (int, optional, default = -1) – Id of GPU used by the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead).

  • seed (int, optional, default = -1) – Seed used for random number generation. Leaving the default value for this parameter results in random seed.

  • exec_pipelined (bool, optional, default = True) – Whether to execute the pipeline in a way that enables overlapping CPU and GPU computation, typically resulting in faster execution speed, but larger memory consumption.

  • prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) – Depth of the executor pipeline. Deeper pipeline makes DALI more resistant to uneven execution time of each batch, but it also consumes more memory for internal buffers. Specifying a dict: { "cpu_size": x, "gpu_size": y } instead of an integer will cause the pipeline to use separated queues executor, with buffer queue size x for cpu stage and y for mixed and gpu stages. It is not supported when both exec_async and exec_pipelined are set to False. Executor will buffer cpu and gpu stages separatelly, and will fill the buffer queues when the first run() is issued.

  • exec_async (bool, optional, default = True) – Whether to execute the pipeline asynchronously. This makes run() method run asynchronously with respect to the calling Python thread. In order to synchronize with the pipeline one needs to call outputs() method.

  • bytes_per_sample (int, optional, default = 0) – A hint for DALI for how much memory to use for its tensors.

  • set_affinity (bool, optional, default = False) – Whether to set CPU core affinity to the one closest to the GPU being used.

  • max_streams (int, optional, default = -1) – Limit the number of CUDA streams used by the executor. Value of -1 does not impose a limit. This parameter is currently unused (and behavior of unrestricted number of streams is assumed).

  • default_cuda_stream_priority (int, optional, default = 0) – CUDA stream priority used by DALI. See cudaStreamCreateWithPriority in CUDA documentation

__enter__()

Safely sets the pipeline as current. Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output).

Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph.

To manually set new (and restore previous) current pipeline, use push_current() and pop_current(), respectively.

__exit__(exception_type, exception_value, traceback)

Safely restores previous pipeline.

add_sink(edge)

Allows to manual add of graph edges to the pipeline which are not connected to the output and all pruned

property batch_size

Batch size.

build(define_graph=None)

Build the pipeline.

Pipeline needs to be built in order to run it standalone. Framework-specific plugins handle this step automatically.

Parameters

define_graph (callable) – If specified, this function will be used instead of member define_graph(). This parameter must not be set, if the pipeline outputs are specified with set_outputs().

define_graph()

This function is defined by the user to construct the graph of operations for their pipeline.

It returns a list of outputs created by calling DALI Operators.

deserialize_and_build(serialized_pipeline)

Deserialize and build the pipeline given in serialized form.

Parameters

serialized_pipeline (str) – Serialized pipeline.

property device_id

Id of the GPU used by the pipeline.

empty()

If there is any work scheduled in the pipeline but not yet consumed

enable_api_check(enable)

Allows to enable or disable API check in the runtime

epoch_size(name=None)

Epoch size of a pipeline.

If the name parameter is None, returns a dictionary of pairs (reader name, epoch size for that reader). If the name parameter is not None, returns epoch size for that reader.

Parameters

name (str, optional, default = None) – The reader which should be used to obtain epoch size.

feed_input(data_node, data, layout='')

Bind a NumPy array (or a list thereof) to an output of ExternalSource.

Parameters
  • data_node (DataNode or str) – The DataNode returned by a call to ExternalSource or a name of the nvidia.dali.ops.ExternalSource

  • data (numpy.ndarray or a list thereof) – The data to be used as the output of the ExternalSource referred to by data_node. In case of GPU external sources, this must be a numpy.ndarray.

  • layout (str) – The description of the data layout (or empty string, if not specified). It should be a string of the length that matches the dimensionality of the data, batch dimension excluded. For a batch of channel-first images, this should be “CHW”, for channel-last video it’s “FHWC” and so on.

iter_setup()

This function can be overriden by user-defined pipeline to perform any needed setup for each iteration. For example, one can use this function to feed the input data from NumPy arrays.

property num_threads

Number of CPU threads used by the pipeline.

outputs()

Returns the outputs of the pipeline and releases previous buffer.

If the pipeline is executed asynchronously, this function blocks until the results become available. It rises StopIteration if data set reached its end - usually when iter_setup cannot produce any more data.

Returns

A list of TensorList objects for respective pipeline outputs

static pop_current()

Restores previous pipeline as current. Complementary to push_current().

static push_current(pipeline)

Sets the pipeline as current and stores the previous current pipeline on stack. To restore previous pipeline as current, use pop_current().

To make sure that the pipeline is properly restored in case of exception, use context manager (with my_pipeline:).

Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output).

Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph.

release_outputs()

Release buffers returned by share_outputs calls.

It helps in case when output call result is consumed (copied) and buffers can be marked as free before the next call to share_outputs. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with schedule_run() and share_outputs() Should not be mixed with run() in the same pipeline

reset()

Resets pipeline iterator

If pipeline iterator reached the end then reset its state to the beginning.

run()

Run the pipeline and return the result.

If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. Should not be mixed with schedule_run() in the same pipeline, share_outputs() and release_outputs()

Returns

A list of TensorList objects for respective pipeline outputs

save_graph_to_dot_file(filename, show_tensors=False, show_ids=False, use_colors=False)

Saves the pipeline graph to a file.

Parameters
  • filename (str) – Name of the file to which the graph is written.

  • show_tensors (bool) – Show the Tensor nodes in the graph (by default only Operator nodes are shown)

  • show_ids (bool) – Add the node id to the graph representation

  • use_colors (bool) – Whether use color to distinguish stages

schedule_run()

Run the pipeline without returning the resulting buffers.

If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. It provides better control to the users about when they want to run the pipeline, when they want to obtain resulting buffers and return them to DALI buffer pool when the results have been consumed. Needs to be used together with release_outputs() and share_outputs(). Should not be mixed with run() in the same pipeline

serialize(define_graph=None)

Serialize the pipeline to a Protobuf string.

set_outputs(*output_data_nodes)

Set the outputs of the pipeline.

Use of this function is an alternative to overriding define_graph in a derived class.

Parameters

*output_data_nodes (unpacked list of DataNode objects) – The outputs of the pipeline

share_outputs()

Returns the outputs of the pipeline.

Main difference to outputs() is that share_outputs doesn’t release returned buffers, release_outputs need to be called for that. If the pipeline is executed asynchronously, this function blocks until the results become available. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with release_outputs() and schedule_run() Should not be mixed with run() in the same pipeline.

Returns

A list of TensorList objects for respective pipeline outputs

DataNode

class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)

This class is a symbolic representation of a TensorList and is used at graph definition stage. It does not carry actual data, but is used to define the connections between operators and to specify the pipeline outputs. See documentation for Pipeline for details.

DataNode objects can be passed to DALI operators as inputs (and some of the named arguments) but they also provide arithmetic operations which implicitly create appropriate operators that perform the expressions.