Pipeline¶
In DALI, any data processing task has a central object called Pipeline. Pipeline object is an
instance of nvidia.dali.Pipeline or a derived class. Pipeline encapsulates the
data processing graph and the execution engine.
You can define a DALI Pipeline in the following ways:
- By implementing a function that uses DALI operators inside and decorating it with the - pipeline_def()decorator.
- By instantiating - Pipelineobject directly, building the graph and setting the pipeline outputs with- Pipeline.set_outputs().
- By inheriting from - Pipelineclass and overriding- Pipeline.define_graph()(this is the legacy way of defining DALI Pipelines).
- 
class nvidia.dali.Pipeline(batch_size=- 1, num_threads=- 1, device_id=- 1, seed=- 1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=- 1, default_cuda_stream_priority=0, *, enable_memory_stats=False, py_num_workers=1, py_start_method='fork')¶
- Pipeline class is the base of all DALI data pipelines. The pipeline encapsulates the data processing graph and the execution engine. - Parameters
- batch_size (int, optional, default = -1) – - Maximum batch size of the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead). In most cases, the actual batch size of the pipeline will be equal to the maximum one. Running the DALI Pipeline with a smaller batch size is also supported. The batch size might change from iteration to iteration. - Please note, that DALI might perform memory preallocations according to this parameter. Setting it too high might result in out-of-memory failure. 
- num_threads (int, optional, default = -1) – Number of CPU threads used by the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead). 
- device_id (int, optional, default = -1) – Id of GPU used by the pipeline. A None value for this parameter means that DALI should not use GPU nor CUDA runtime. This limits the pipeline to only CPU operators but allows it to run on any CPU capable machine. 
- seed (int, optional, default = -1) – Seed used for random number generation. Leaving the default value for this parameter results in random seed. 
- exec_pipelined (bool, optional, default = True) – Whether to execute the pipeline in a way that enables overlapping CPU and GPU computation, typically resulting in faster execution speed, but larger memory consumption. 
- prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) – Depth of the executor pipeline. Deeper pipeline makes DALI more resistant to uneven execution time of each batch, but it also consumes more memory for internal buffers. Specifying a dict: - { "cpu_size": x, "gpu_size": y }instead of an integer will cause the pipeline to use separated queues executor, with buffer queue size x for cpu stage and y for mixed and gpu stages. It is not supported when both exec_async and exec_pipelined are set to False. Executor will buffer cpu and gpu stages separatelly, and will fill the buffer queues when the first- run()is issued.
- exec_async (bool, optional, default = True) – Whether to execute the pipeline asynchronously. This makes - run()method run asynchronously with respect to the calling Python thread. In order to synchronize with the pipeline one needs to call- outputs()method.
- bytes_per_sample (int, optional, default = 0) – A hint for DALI for how much memory to use for its tensors. 
- set_affinity (bool, optional, default = False) – Whether to set CPU core affinity to the one closest to the GPU being used. 
- max_streams (int, optional, default = -1) – Limit the number of CUDA streams used by the executor. Value of -1 does not impose a limit. This parameter is currently unused (and behavior of unrestricted number of streams is assumed). 
- default_cuda_stream_priority (int, optional, default = 0) – CUDA stream priority used by DALI. See cudaStreamCreateWithPriority in CUDA documentation 
- enable_memory_stats (bool, optional, default = 1) – If DALI should print operator output buffer statistics. Usefull for bytes_per_sample_hint operator parameter. 
- py_num_workers (int, optional, default = 1) – The number of Python workers that will process - ExternalSourcecallbacks. The pool starts only if there is at least one ExternalSource with- parallelset to True. Setting it to 0 disables the pool and all ExternalSource operators fall back to non-parallel mode even if- parallelis set to True.
- py_start_method (str, default = "fork") – - Determines how Python workers are started. Supported methods: - "fork"- start by forking the process
- "spawn"- start a fresh interpreter process
 - If - spawnmethod is used, ExternalSource’s callback must be picklable. In order to use- fork, there must be no CUDA contexts acquired at the moment of starting the workers. For this reason, if you need to build multiple pipelines that use Python workers, you will need to call- start_py_workers()before calling- build()of any of the pipelines. You can find more details and caveats of both methods in Python’s- multiprocessingmodule documentation.
 
 - 
__enter__()¶
- Safely sets the pipeline as current. Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output). - Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph. - To manually set new (and restore previous) current pipeline, use - push_current()and- pop_current(), respectively.
 - 
__exit__(exception_type, exception_value, traceback)¶
- Safely restores previous pipeline. 
 - 
add_sink(edge)¶
- Allows to manual add of graph edges to the pipeline which are not connected to the output and all pruned 
 - 
property batch_size¶
- Batch size. 
 - 
build(define_graph=None)¶
- Build the pipeline. - Pipeline needs to be built in order to run it standalone. Framework-specific plugins handle this step automatically. - Parameters
- define_graph (callable) – - If specified, this function will be used instead of member - define_graph(). This parameter must not be set, if the pipeline outputs are specified with- set_outputs()or if the- start_py_workers()is used.- Note - This method of defining the processing graph cannot be used with parallel - ExternalSource.
 
 - 
property cpu_queue_size¶
- The number of iterations processed ahead by the CPU stage. 
 - 
property default_cuda_stream_priority¶
- Default priority of the CUDA streams used by this pipeline. 
 - 
define_graph()¶
- This function is defined by the user to construct the graph of operations for their pipeline. - It returns a list of outputs created by calling DALI Operators. 
 - 
classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)¶
- Deserialize and build pipeline. - Deserialize pipeline, previously serialized with - serialize()method.- Returned pipeline is already built. - Alternatively, additional arguments can be passed, which will be used when instantiating the pipeline. Refer to Pipeline constructor for full list of arguments. By default, the pipeline will be instantiated with the arguments from serialized pipeline. - Note, that - serialized_pipelineand- filenameparameters are mutually exclusive- Parameters
- serialized_pipeline (str) – Pipeline, serialized using - serialize()method.
- filename (str) – File, from which serialized pipeline will be read. 
- kwargs (dict) – Refer to Pipeline constructor for full list of arguments. 
 
- Returns
- Return type
- Deserialized and built pipeline. 
 
 - 
deserialize_and_build(serialized_pipeline)¶
- Deserialize and build the pipeline given in serialized form. - Parameters
- serialized_pipeline (str) – Serialized pipeline. 
 
 - 
property device_id¶
- Id of the GPU used by the pipeline or None for CPU-only pipelines. 
 - 
empty()¶
- If there is any work scheduled in the pipeline but not yet consumed 
 - 
enable_api_check(enable)¶
- Allows to enable or disable API check in the runtime 
 - 
property enable_memory_stats¶
- If True, memory usage statistics are gathered. 
 - 
epoch_size(name=None)¶
- Epoch size of a pipeline. - If the name parameter is None, returns a dictionary of pairs (reader name, epoch size for that reader). If the name parameter is not None, returns epoch size for that reader. - Parameters
- name (str, optional, default = None) – The reader which should be used to obtain epoch size. 
 
 - 
property exec_async¶
- If true, asynchronous execution is used. 
 - 
property exec_pipelined¶
- If true, pipeline execution model is used. 
 - 
property exec_separated¶
- If True, there are separate prefetch queues for CPU and GPU stages. 
 - 
executor_statistics()¶
- Returns provided pipeline executor statistics metadata as a dictionary. Each key in the dictionary is the operator name. To enable it use - executor_statistics- Available metadata keys for each operator: - real_memory_size- list of memory sizes that is used by each output of the operator. Index in the list corresponds to the output index.
- max_real_memory_size- list of maximum tensor size that is used by each output of the operator. Index in the list corresponds to the output index.
- reserved_memory_size- list of memory sizes that is reserved for each of the operator outputs. Index in the list corresponds to the output index.
- max_reserved_memory_size- list of maximum memory sizes per tensor that is reserved for each of the operator outputs. Index in the list corresponds to the output index.
 
 - 
feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)¶
- Pass a mutlidimensional array or DLPack (or a list thereof) to an output of ExternalSource. In the case of the GPU input, the data must be modified on the same stream as the one used by feed_input. See - cuda_streamparameter for details.- Parameters
- data_node ( - DataNodeor str) – The name of the- nvidia.dali.fn.external_sourcenode or a- DataNodeobject returned by a call to that ExternalSource.
- data (an ndarray or DLPack or a list thereof) – - The array(s) may be one of: - NumPy ndarray (CPU) 
- MXNet ndarray (CPU) 
- PyTorch tensor (CPU or GPU) 
- CuPy array (GPU) 
- objects implementing - __cuda_array_interface__
- DALI TensorList or list of DALI Tensor objects 
 - The data to be used as the output of the ExternalSource referred to by data_node. 
- layout (str or None) – The description of the data layout (or empty string, if not specified). It should be a string of the length that matches the dimensionality of the data, batch dimension excluded. For a batch of channel-first images, this should be “CHW”, for channel-last video it’s “FHWC” and so on. If - datais a DALI TensorList or a list of DALI Tensor objects and- layoutis- None, the layout is taken from- data.
- cuda_stream (optional, cudaStream_t or an object convertible to cudaStream_t, e.g. cupy.cuda.Stream, torch.cuda.Stream) – - The CUDA stream, which is going to be used for copying data to GPU or from a GPU source. If not set, best effort will be taken to maintain correctness - i.e. if the data is provided as a tensor/array from a recognized library (CuPy, PyTorch), the library’s current stream is used. This should work in typical scenarios, but advanced use cases (and code using unsupported libraries) may still need to supply the stream handle explicitly. - Special values: - 0 - use default CUDA stream 
- -1 - use DALI’s internal stream 
 - If internal stream is used, the call to - feed_inputwill block until the copy to internal buffer is complete, since there’s no way to synchronize with this stream to prevent overwriting the array with new data in another stream.
- use_copy_kernel (optional, bool) – If set to True, DALI will use a CUDA kernel to feed the data (only applicable when copying data to/from GPU memory) instead of cudaMemcpyAsync (default). 
 
 
 - 
property gpu_queue_size¶
- The number of iterations processed ahead by the GPU stage. 
 - 
iter_setup()¶
- This function can be overriden by user-defined pipeline to perform any needed setup for each iteration. For example, one can use this function to feed the input data from NumPy arrays. 
 - 
property max_batch_size¶
- Maximum batch size. 
 - 
property max_streams¶
- Reserved for future use. 
 - 
property num_threads¶
- Number of CPU threads used by this pipeline. 
 - 
outputs()¶
- Returns the outputs of the pipeline and releases previous buffer. - If the pipeline is executed asynchronously, this function blocks until the results become available. It rises StopIteration if data set reached its end - usually when iter_setup cannot produce any more data. - Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
static pop_current()¶
- Restores previous pipeline as current. Complementary to - push_current().
 - 
property prefetch_queue_depth¶
- Depth (or depths) of the prefetch queue, as specified in the - __init__arguments.
 - 
static push_current(pipeline)¶
- Sets the pipeline as current and stores the previous current pipeline on stack. To restore previous pipeline as current, use - pop_current().- To make sure that the pipeline is properly restored in case of exception, use context manager (with my_pipeline:). - Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output). - Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph. 
 - 
property py_num_workers¶
- The number of Python worker processes used by parallel - `external_source`.
 - 
property py_start_method¶
- The method of launching Python worker processes used by parallel - `external_source`.
 - 
reader_meta(name=None)¶
- Returns provided reader metadata as a dictionary. If no name is provided if provides a dictionary with data for all readers as {reader_name : meta} - Available metadata keys: - epoch_size: raw epoch size- epoch_size_padded: epoch size with the padding at the end to be divisible by the number of shards- number_of_shards: number of shards- shard_id: shard id of given reader- pad_last_batch: if given reader should pad last batch- stick_to_shard: if given reader should stick to its shard- Parameters
- name (str, optional, default = None) – The reader which should be used to obtain shards_number. 
 
 - 
release_outputs()¶
- Release buffers returned by share_outputs calls. - It helps in case when output call result is consumed (copied) and buffers can be marked as free before the next call to share_outputs. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with - schedule_run()and- share_outputs()Should not be mixed with- run()in the same pipeline
 - 
reset()¶
- Resets pipeline iterator - If pipeline iterator reached the end then reset its state to the beginning. 
 - 
run()¶
- Run the pipeline and return the result. - If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. Should not be mixed with - schedule_run()in the same pipeline,- share_outputs()and- release_outputs()- Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
save_graph_to_dot_file(filename, show_tensors=False, show_ids=False, use_colors=False)¶
- Saves the pipeline graph to a file. - Parameters
- filename (str) – Name of the file to which the graph is written. 
- show_tensors (bool) – Show the Tensor nodes in the graph (by default only Operator nodes are shown) 
- show_ids (bool) – Add the node id to the graph representation 
- use_colors (bool) – Whether use color to distinguish stages 
 
 
 - 
schedule_run()¶
- Run the pipeline without returning the resulting buffers. - If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. It provides better control to the users about when they want to run the pipeline, when they want to obtain resulting buffers and return them to DALI buffer pool when the results have been consumed. Needs to be used together with - release_outputs()and- share_outputs(). Should not be mixed with- run()in the same pipeline
 - 
property seed¶
- Random seed used in the pipeline or None, if seed is not fixed. 
 - 
serialize(define_graph=None, filename=None)¶
- Serialize the pipeline to a Protobuf string. - Additionally, you can pass file name, so that serialized pipeline will be written there. The file contents will be overwritten - Parameters
- define_graph (callable) – If specified, this function will be used instead of member - define_graph(). This parameter must not be set, if the pipeline outputs are specified with- set_outputs().
- filename (str) – File, from where serialized pipeline will be writeen. 
- kwargs (dict) – Refer to Pipeline constructor for full list of arguments. 
 
 
 - 
property set_affinity¶
- If True, worker threads are bound to CPU cores. 
 - 
set_outputs(*output_data_nodes)¶
- Set the outputs of the pipeline. - Use of this function is an alternative to overriding define_graph in a derived class. - Parameters
- *output_data_nodes (unpacked list of - DataNodeobjects) – The outputs of the pipeline
 
 - Returns the outputs of the pipeline. - Main difference to - outputs()is that share_outputs doesn’t release returned buffers, release_outputs need to be called for that. If the pipeline is executed asynchronously, this function blocks until the results become available. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with- release_outputs()and- schedule_run()Should not be mixed with- run()in the same pipeline.- Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
start_py_workers()¶
- Start Python workers (that will run - ExternalSourcecallbacks). You need to call- start_py_workers()before you call any functionality that creates or acquires CUDA context when using- forkto start Python workers (- py_start_method="fork"). It is called automatically by- Pipeline.build()method when such separation is not necessary.- If you are going to build more than one pipeline that starts Python workers by forking the process then you need to call - start_py_workers()method on all those pipelines before calling- build()method of any pipeline, as build acquires CUDA context for current process.- The same applies to using any other functionality that would create CUDA context - for example, initializing a framework that uses CUDA or creating CUDA tensors with it. You need to call - start_py_workers()before you call such functionality when using- py_start_method="fork".- Forking a process that has a CUDA context is unsupported and may lead to unexpected errors. - If you use the method you cannot specify - define_graphargument when calling- build().
 
Data Processing Graphs¶
DALI pipeline is represented as a graph of operations. There are two kinds of nodes in the graph:
Operators - created on each call to an operator
Data nodes (see
DataNode) - represent outputs and inputs of operators; they are returned from calls to operators and passing them as inputs to other operators establishes connections in the graph.
Example:
@pipeline_def  # create a pipeline with processing graph defined by the function below
def my_pipeline():
    """ Create a pipeline which reads images and masks, decodes the images and returns them. """
    img_files, labels = fn.readers.file(file_root="image_dir", seed=1)
    mask_files, _ = fn.readers.file(file_root="mask_dir", seed=1)
    images = fn.decoders.image(img_files, device="mixed")
    masks  = fn.decoders.image(mask_files, device="mixed")
    return images, masks, labels
pipe = my_pipeline(batch_size=4, num_threads=2, device_id=0)
pipe.build()
The resulting graph is:
Current Pipeline¶
Subgraphs that do not contribute to the pipeline output are automatically pruned.
If an operator has side effects (e.g. PythonFunction operator family), it cannot be invoked
without setting the current pipeline. Current pipeline is set implicitly when the graph is
defined inside derived pipelines’ Pipeline.define_graph() method.
Otherwise, it can be set using context manager (with statement):
pipe = dali.Pipeline(batch_size=N, num_threads=3, device_id=0)
with pipe:
    src = dali.ops.ExternalSource(my_source, num_outputs=2)
    a, b = src()
    pipe.set_outputs(a, b)
When creating a pipeline with pipeline_def(), the function which defines the pipeline is
executed within the scope of the newly created pipeline. The following example is equivalent
to the previous one:
@dali.pipeline_def(batch_size=N, num_threads=3, device_id=0)
def my_pipe(my_source):
    return dali.fn.external_source(my_source, num_outputs=2)
pipe = my_pipe(my_source)
- 
class nvidia.dali.Pipeline(batch_size=- 1, num_threads=- 1, device_id=- 1, seed=- 1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=- 1, default_cuda_stream_priority=0, *, enable_memory_stats=False, py_num_workers=1, py_start_method='fork')¶
- Pipeline class is the base of all DALI data pipelines. The pipeline encapsulates the data processing graph and the execution engine. - Parameters
- batch_size (int, optional, default = -1) – - Maximum batch size of the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead). In most cases, the actual batch size of the pipeline will be equal to the maximum one. Running the DALI Pipeline with a smaller batch size is also supported. The batch size might change from iteration to iteration. - Please note, that DALI might perform memory preallocations according to this parameter. Setting it too high might result in out-of-memory failure. 
- num_threads (int, optional, default = -1) – Number of CPU threads used by the pipeline. Negative values for this parameter are invalid - the default value may only be used with serialized pipeline (the value stored in serialized pipeline is used instead). 
- device_id (int, optional, default = -1) – Id of GPU used by the pipeline. A None value for this parameter means that DALI should not use GPU nor CUDA runtime. This limits the pipeline to only CPU operators but allows it to run on any CPU capable machine. 
- seed (int, optional, default = -1) – Seed used for random number generation. Leaving the default value for this parameter results in random seed. 
- exec_pipelined (bool, optional, default = True) – Whether to execute the pipeline in a way that enables overlapping CPU and GPU computation, typically resulting in faster execution speed, but larger memory consumption. 
- prefetch_queue_depth (int or {"cpu_size": int, "gpu_size": int}, optional, default = 2) – Depth of the executor pipeline. Deeper pipeline makes DALI more resistant to uneven execution time of each batch, but it also consumes more memory for internal buffers. Specifying a dict: - { "cpu_size": x, "gpu_size": y }instead of an integer will cause the pipeline to use separated queues executor, with buffer queue size x for cpu stage and y for mixed and gpu stages. It is not supported when both exec_async and exec_pipelined are set to False. Executor will buffer cpu and gpu stages separatelly, and will fill the buffer queues when the first- run()is issued.
- exec_async (bool, optional, default = True) – Whether to execute the pipeline asynchronously. This makes - run()method run asynchronously with respect to the calling Python thread. In order to synchronize with the pipeline one needs to call- outputs()method.
- bytes_per_sample (int, optional, default = 0) – A hint for DALI for how much memory to use for its tensors. 
- set_affinity (bool, optional, default = False) – Whether to set CPU core affinity to the one closest to the GPU being used. 
- max_streams (int, optional, default = -1) – Limit the number of CUDA streams used by the executor. Value of -1 does not impose a limit. This parameter is currently unused (and behavior of unrestricted number of streams is assumed). 
- default_cuda_stream_priority (int, optional, default = 0) – CUDA stream priority used by DALI. See cudaStreamCreateWithPriority in CUDA documentation 
- enable_memory_stats (bool, optional, default = 1) – If DALI should print operator output buffer statistics. Usefull for bytes_per_sample_hint operator parameter. 
- py_num_workers (int, optional, default = 1) – The number of Python workers that will process - ExternalSourcecallbacks. The pool starts only if there is at least one ExternalSource with- parallelset to True. Setting it to 0 disables the pool and all ExternalSource operators fall back to non-parallel mode even if- parallelis set to True.
- py_start_method (str, default = "fork") – - Determines how Python workers are started. Supported methods: - "fork"- start by forking the process
- "spawn"- start a fresh interpreter process
 - If - spawnmethod is used, ExternalSource’s callback must be picklable. In order to use- fork, there must be no CUDA contexts acquired at the moment of starting the workers. For this reason, if you need to build multiple pipelines that use Python workers, you will need to call- start_py_workers()before calling- build()of any of the pipelines. You can find more details and caveats of both methods in Python’s- multiprocessingmodule documentation.
 
 - 
__enter__()¶
- Safely sets the pipeline as current. Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output). - Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph. - To manually set new (and restore previous) current pipeline, use - push_current()and- pop_current(), respectively.
 - 
__exit__(exception_type, exception_value, traceback)¶
- Safely restores previous pipeline. 
 - 
add_sink(edge)¶
- Allows to manual add of graph edges to the pipeline which are not connected to the output and all pruned 
 - 
property batch_size¶
- Batch size. 
 - 
build(define_graph=None)¶
- Build the pipeline. - Pipeline needs to be built in order to run it standalone. Framework-specific plugins handle this step automatically. - Parameters
- define_graph (callable) – - If specified, this function will be used instead of member - define_graph(). This parameter must not be set, if the pipeline outputs are specified with- set_outputs()or if the- start_py_workers()is used.- Note - This method of defining the processing graph cannot be used with parallel - ExternalSource.
 
 - 
property cpu_queue_size¶
- The number of iterations processed ahead by the CPU stage. 
 - 
property default_cuda_stream_priority¶
- Default priority of the CUDA streams used by this pipeline. 
 - 
define_graph()¶
- This function is defined by the user to construct the graph of operations for their pipeline. - It returns a list of outputs created by calling DALI Operators. 
 - 
classmethod deserialize(serialized_pipeline=None, filename=None, **kwargs)¶
- Deserialize and build pipeline. - Deserialize pipeline, previously serialized with - serialize()method.- Returned pipeline is already built. - Alternatively, additional arguments can be passed, which will be used when instantiating the pipeline. Refer to Pipeline constructor for full list of arguments. By default, the pipeline will be instantiated with the arguments from serialized pipeline. - Note, that - serialized_pipelineand- filenameparameters are mutually exclusive- Parameters
- serialized_pipeline (str) – Pipeline, serialized using - serialize()method.
- filename (str) – File, from which serialized pipeline will be read. 
- kwargs (dict) – Refer to Pipeline constructor for full list of arguments. 
 
- Returns
- Return type
- Deserialized and built pipeline. 
 
 - 
deserialize_and_build(serialized_pipeline)¶
- Deserialize and build the pipeline given in serialized form. - Parameters
- serialized_pipeline (str) – Serialized pipeline. 
 
 - 
property device_id¶
- Id of the GPU used by the pipeline or None for CPU-only pipelines. 
 - 
empty()¶
- If there is any work scheduled in the pipeline but not yet consumed 
 - 
enable_api_check(enable)¶
- Allows to enable or disable API check in the runtime 
 - 
property enable_memory_stats¶
- If True, memory usage statistics are gathered. 
 - 
epoch_size(name=None)¶
- Epoch size of a pipeline. - If the name parameter is None, returns a dictionary of pairs (reader name, epoch size for that reader). If the name parameter is not None, returns epoch size for that reader. - Parameters
- name (str, optional, default = None) – The reader which should be used to obtain epoch size. 
 
 - 
property exec_async¶
- If true, asynchronous execution is used. 
 - 
property exec_pipelined¶
- If true, pipeline execution model is used. 
 - 
property exec_separated¶
- If True, there are separate prefetch queues for CPU and GPU stages. 
 - 
executor_statistics()¶
- Returns provided pipeline executor statistics metadata as a dictionary. Each key in the dictionary is the operator name. To enable it use - executor_statistics- Available metadata keys for each operator: - real_memory_size- list of memory sizes that is used by each output of the operator. Index in the list corresponds to the output index.
- max_real_memory_size- list of maximum tensor size that is used by each output of the operator. Index in the list corresponds to the output index.
- reserved_memory_size- list of memory sizes that is reserved for each of the operator outputs. Index in the list corresponds to the output index.
- max_reserved_memory_size- list of maximum memory sizes per tensor that is reserved for each of the operator outputs. Index in the list corresponds to the output index.
 
 - 
feed_input(data_node, data, layout=None, cuda_stream=None, use_copy_kernel=False)¶
- Pass a mutlidimensional array or DLPack (or a list thereof) to an output of ExternalSource. In the case of the GPU input, the data must be modified on the same stream as the one used by feed_input. See - cuda_streamparameter for details.- Parameters
- data_node ( - DataNodeor str) – The name of the- nvidia.dali.fn.external_sourcenode or a- DataNodeobject returned by a call to that ExternalSource.
- data (an ndarray or DLPack or a list thereof) – - The array(s) may be one of: - NumPy ndarray (CPU) 
- MXNet ndarray (CPU) 
- PyTorch tensor (CPU or GPU) 
- CuPy array (GPU) 
- objects implementing - __cuda_array_interface__
- DALI TensorList or list of DALI Tensor objects 
 - The data to be used as the output of the ExternalSource referred to by data_node. 
- layout (str or None) – The description of the data layout (or empty string, if not specified). It should be a string of the length that matches the dimensionality of the data, batch dimension excluded. For a batch of channel-first images, this should be “CHW”, for channel-last video it’s “FHWC” and so on. If - datais a DALI TensorList or a list of DALI Tensor objects and- layoutis- None, the layout is taken from- data.
- cuda_stream (optional, cudaStream_t or an object convertible to cudaStream_t, e.g. cupy.cuda.Stream, torch.cuda.Stream) – - The CUDA stream, which is going to be used for copying data to GPU or from a GPU source. If not set, best effort will be taken to maintain correctness - i.e. if the data is provided as a tensor/array from a recognized library (CuPy, PyTorch), the library’s current stream is used. This should work in typical scenarios, but advanced use cases (and code using unsupported libraries) may still need to supply the stream handle explicitly. - Special values: - 0 - use default CUDA stream 
- -1 - use DALI’s internal stream 
 - If internal stream is used, the call to - feed_inputwill block until the copy to internal buffer is complete, since there’s no way to synchronize with this stream to prevent overwriting the array with new data in another stream.
- use_copy_kernel (optional, bool) – If set to True, DALI will use a CUDA kernel to feed the data (only applicable when copying data to/from GPU memory) instead of cudaMemcpyAsync (default). 
 
 
 - 
property gpu_queue_size¶
- The number of iterations processed ahead by the GPU stage. 
 - 
iter_setup()¶
- This function can be overriden by user-defined pipeline to perform any needed setup for each iteration. For example, one can use this function to feed the input data from NumPy arrays. 
 - 
property max_batch_size¶
- Maximum batch size. 
 - 
property max_streams¶
- Reserved for future use. 
 - 
property num_threads¶
- Number of CPU threads used by this pipeline. 
 - 
outputs()¶
- Returns the outputs of the pipeline and releases previous buffer. - If the pipeline is executed asynchronously, this function blocks until the results become available. It rises StopIteration if data set reached its end - usually when iter_setup cannot produce any more data. - Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
static pop_current()¶
- Restores previous pipeline as current. Complementary to - push_current().
 - 
property prefetch_queue_depth¶
- Depth (or depths) of the prefetch queue, as specified in the - __init__arguments.
 - 
static push_current(pipeline)¶
- Sets the pipeline as current and stores the previous current pipeline on stack. To restore previous pipeline as current, use - pop_current().- To make sure that the pipeline is properly restored in case of exception, use context manager (with my_pipeline:). - Current pipeline is required to call operators with side effects or without outputs. Examples of such operators are PythonFunction (potential side effects) or DumpImage (no output). - Any dangling operator can be marked as having side effects if it’s marked with preserve=True, which can be useful for debugging - otherwise operator which does not contribute to the pipeline output is removed from the graph. 
 - 
property py_num_workers¶
- The number of Python worker processes used by parallel - `external_source`.
 - 
property py_start_method¶
- The method of launching Python worker processes used by parallel - `external_source`.
 - 
reader_meta(name=None)¶
- Returns provided reader metadata as a dictionary. If no name is provided if provides a dictionary with data for all readers as {reader_name : meta} - Available metadata keys: - epoch_size: raw epoch size- epoch_size_padded: epoch size with the padding at the end to be divisible by the number of shards- number_of_shards: number of shards- shard_id: shard id of given reader- pad_last_batch: if given reader should pad last batch- stick_to_shard: if given reader should stick to its shard- Parameters
- name (str, optional, default = None) – The reader which should be used to obtain shards_number. 
 
 - 
release_outputs()¶
- Release buffers returned by share_outputs calls. - It helps in case when output call result is consumed (copied) and buffers can be marked as free before the next call to share_outputs. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with - schedule_run()and- share_outputs()Should not be mixed with- run()in the same pipeline
 - 
reset()¶
- Resets pipeline iterator - If pipeline iterator reached the end then reset its state to the beginning. 
 - 
run()¶
- Run the pipeline and return the result. - If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. Should not be mixed with - schedule_run()in the same pipeline,- share_outputs()and- release_outputs()- Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
save_graph_to_dot_file(filename, show_tensors=False, show_ids=False, use_colors=False)¶
- Saves the pipeline graph to a file. - Parameters
- filename (str) – Name of the file to which the graph is written. 
- show_tensors (bool) – Show the Tensor nodes in the graph (by default only Operator nodes are shown) 
- show_ids (bool) – Add the node id to the graph representation 
- use_colors (bool) – Whether use color to distinguish stages 
 
 
 - 
schedule_run()¶
- Run the pipeline without returning the resulting buffers. - If the pipeline was created with exec_pipelined option set to True, this function will also start prefetching the next iteration for faster execution. It provides better control to the users about when they want to run the pipeline, when they want to obtain resulting buffers and return them to DALI buffer pool when the results have been consumed. Needs to be used together with - release_outputs()and- share_outputs(). Should not be mixed with- run()in the same pipeline
 - 
property seed¶
- Random seed used in the pipeline or None, if seed is not fixed. 
 - 
serialize(define_graph=None, filename=None)¶
- Serialize the pipeline to a Protobuf string. - Additionally, you can pass file name, so that serialized pipeline will be written there. The file contents will be overwritten - Parameters
- define_graph (callable) – If specified, this function will be used instead of member - define_graph(). This parameter must not be set, if the pipeline outputs are specified with- set_outputs().
- filename (str) – File, from where serialized pipeline will be writeen. 
- kwargs (dict) – Refer to Pipeline constructor for full list of arguments. 
 
 
 - 
property set_affinity¶
- If True, worker threads are bound to CPU cores. 
 - 
set_outputs(*output_data_nodes)¶
- Set the outputs of the pipeline. - Use of this function is an alternative to overriding define_graph in a derived class. - Parameters
- *output_data_nodes (unpacked list of - DataNodeobjects) – The outputs of the pipeline
 
 - 
share_outputs()¶
- Returns the outputs of the pipeline. - Main difference to - outputs()is that share_outputs doesn’t release returned buffers, release_outputs need to be called for that. If the pipeline is executed asynchronously, this function blocks until the results become available. It provides the user with better control about when he wants to run the pipeline, when he wants to obtain the resulting buffers and when they can be returned to DALI pool when the results have been consumed. Needs to be used together with- release_outputs()and- schedule_run()Should not be mixed with- run()in the same pipeline.- Returns
- A list of TensorList objects for respective pipeline outputs 
 
 - 
start_py_workers()¶
- Start Python workers (that will run - ExternalSourcecallbacks). You need to call- start_py_workers()before you call any functionality that creates or acquires CUDA context when using- forkto start Python workers (- py_start_method="fork"). It is called automatically by- Pipeline.build()method when such separation is not necessary.- If you are going to build more than one pipeline that starts Python workers by forking the process then you need to call - start_py_workers()method on all those pipelines before calling- build()method of any pipeline, as build acquires CUDA context for current process.- The same applies to using any other functionality that would create CUDA context - for example, initializing a framework that uses CUDA or creating CUDA tensors with it. You need to call - start_py_workers()before you call such functionality when using- py_start_method="fork".- Forking a process that has a CUDA context is unsupported and may lead to unexpected errors. - If you use the method you cannot specify - define_graphargument when calling- build().
 
Pipeline Decorator¶
- 
@nvidia.dali.pipeline_def(fn=None, **pipeline_kwargs)¶
- Decorator that converts a graph definition function into a DALI pipeline factory. - A graph definition function is a function that returns intended pipeline outputs. You can decorate this function with - @pipeline_def:- @pipeline_def def my_pipe(flip_vertical, flip_horizontal): ''' Creates a DALI pipeline, which returns flipped and original images ''' data, _ = fn.readers.file(file_root=images_dir) img = fn.decoders.image(data, device="mixed") flipped = fn.flip(img, horizontal=flip_horizontal, vertical=flip_vertical) return flipped, img - The decorated function returns a DALI Pipeline object: - pipe = my_pipe(True, False) # pipe.build() # the pipeline is not configured properly yet - A pipeline requires additional parameters such as batch size, number of worker threads, GPU device id and so on (see - Pipeline.__init__()for a complete list of pipeline parameters). These parameters can be supplied as additional keyword arguments, passed to the decorated function:- pipe = my_pipe(True, False, batch_size=32, num_threads=1, device_id=0) pipe.build() # the pipeline is properly configured, we can build it now - The outputs from the original function became the outputs of the Pipeline: - flipped, img = pipe.run() - When some of the pipeline parameters are fixed, they can be specified by name in the decorator: - @pipeline_def(batch_size=42, num_threads=3) def my_pipe(flip_vertical, flip_horizontal): ... - Any Pipeline constructor parameter passed later when calling the decorated function will override the decorator-defined params: - @pipeline_def(batch_size=32, num_threads=3) def my_pipe(): data = fn.external_source(source=my_generator) return data pipe = my_pipe(batch_size=128) # batch_size=128 overrides batch_size=32 - Warning - The arguments of the function being decorated can shadow pipeline constructor arguments - in which case there’s no way to alter their values. - Note - Using - **kwargs(variadic keyword arguments) in graph-defining function is not allowed. They may result in unwanted, silent hijacking of some arguments of the same name by Pipeline constructor. Code written this way would cease to work with future versions of DALI when new parameters are added to the Pipeline constructor.
DataNode¶
- 
class nvidia.dali.pipeline.DataNode(name, device='cpu', source=None)¶
- This class is a symbolic representation of a TensorList and is used at graph definition stage. It does not carry actual data, but is used to define the connections between operators and to specify the pipeline outputs. See documentation for - Pipelinefor details.- DataNode objects can be passed to DALI operators as inputs (and some of the named arguments) but they also provide arithmetic operations which implicitly create appropriate operators that perform the expressions.