Getting started

Overview

NVIDIA Data Loading Library (DALI) is a collection of highly optimized building blocks and an execution engine that accelerates the data pipeline for computer vision deep learning applications.

Input and augmentation pipelines provided by Deep Learning frameworks fit typically into one of two categories:

  • fast, but inflexible - written in C++, they are exposed as a single monolithic Python object with very specific set and ordering of operations it provides

  • slow, but flexible - set of building blocks written in either C++ or Python, that can be used to compose arbitrary data pipelines that end up being slow. One of the biggest overheads for this type of data pipelines is Global Interpreter Lock (GIL) in Python. This forces developers to use multiprocessing, complicating the design of efficient input pipelines.

DALI stands out by providing both performance and flexibility of accelerating different data pipelines. It achieves that by exposing optimized building blocks which are executed using simple and efficient engine, and enabling offloading of operations to GPU (thus enabling scaling to multi-GPU systems).

It is a single library, that can be easily integrated into different deep learning training and inference applications.

Optimal configuration

DALI offers ease-of-use and flexibility across GPU enabled systems with direct framework plugins, multiple input data formats, and configurable graphs. DALI can help achieve overall speedup on deep learning workflows that are bottlenecked on I/O pipelines due to the limitations of CPU cycles. Typically, systems with high GPU to CPU ratio (such as Amazon EC2 P3.16xlarge, NVIDIA DGX1-V or upcoming NVIDIA DGX-2) are constrained on the host CPU, thereby under-utilizing the available GPU compute capabilities. DALI significantly accelerates input processing on such dense GPU configurations to achieve the overall throughput.

Pipeline

The most important type in DALI is the Pipeline. It contains all the necessary information and multiple functions related to defining, building and running the pipeline. In order to make our own input and augmentation pipeline, we will make subclasses of it.

[1]:
from nvidia.dali.pipeline import Pipeline
help(Pipeline)
Help on class Pipeline in module nvidia.dali.pipeline:

class Pipeline(__builtin__.object)
 |  Pipeline class encapsulates all data required to define and run
 |  DALI input pipeline.
 |
 |  Parameters
 |  ----------
 |  `batch_size` : int, optional, default = -1
 |      Batch size of the pipeline. Negative values for this parameter
 |      are invalid - the default value may only be used with
 |      serialized pipeline (the value stored in serialized pipeline
 |      is used instead).
 |  `num_threads` : int, optional, default = -1
 |      Number of CPU threads used by the pipeline.
 |      Negative values for this parameter are invalid - the default
 |      value may only be used with serialized pipeline (the value
 |      stored in serialized pipeline is used instead).
 |  `device_id` : int, optional, default = -1
 |      Id of GPU used by the pipeline.
 |      Negative values for this parameter are invalid - the default
 |      value may only be used with serialized pipeline (the value
 |      stored in serialized pipeline is used instead).
 |  `seed` : int, optional, default = -1
 |      Seed used for random number generation. Leaving the default value
 |      for this parameter results in random seed.
 |  `exec_pipelined` : bool, optional, default = True
 |      Whether to execute the pipeline in a way that enables
 |      overlapping CPU and GPU computation, typically resulting
 |      in faster execution speed, but larger memory consumption.
 |  `exec_async` : bool, optional, default = True
 |      Whether to execute the pipeline asynchronously.
 |      This makes :meth:`nvidia.dali.pipeline.Pipeline.run` method
 |      run asynchronously with respect to the calling Python thread.
 |      In order to synchronize with the pipeline one needs to call
 |      :meth:`nvidia.dali.pipeline.Pipeline.outputs` method.
 |  `bytes_per_sample` : int, optional, default = 0
 |      A hint for DALI for how much memory to use for its tensors.
 |  `set_affinity` : bool, optional, default = False
 |      Whether to set CPU core affinity to the one closest to the
 |      GPU being used.
 |  `max_streams` : int, optional, default = -1
 |      Limit the number of CUDA streams used by the executor.
 |      Value of -1 does not impose a limit.
 |      This parameter is currently unused (and behavior of
 |      unrestricted number of streams is assumed).
 |  `prefetch_queue_depth` : int or {"cpu_size": int, "gpu_size": int}, optional, default = 2
 |      Depth of the executor pipeline. Deeper pipeline makes DALI
 |      more resistant to uneven execution time of each batch, but it
 |      also consumes more memory for internal buffers.
 |      Specifying a dict:
 |      ``{ "cpu_size": x, "gpu_size": y }``
 |      instead of integer will cause the pipeline to use separated
 |      queues executor, with buffer queue size `x` for cpu stage
 |      and `y` for mixed and gpu stages. It is not supported when both `exec_async`
 |      and `exec_pipelined` are set to `False`.
 |      Executor will buffer cpu and gpu stages separatelly,
 |      and will fill the buffer queues when the first :meth:`nvidia.dali.pipeline.Pipeline.run`
 |      is issued.
 |
 |  Methods defined here:
 |
 |  __init__(self, batch_size=-1, num_threads=-1, device_id=-1, seed=-1, exec_pipelined=True, prefetch_queue_depth=2, exec_async=True, bytes_per_sample=0, set_affinity=False, max_streams=-1, default_cuda_stream_priority=0)
 |
 |  build(self)
 |      Build the pipeline.
 |
 |      Pipeline needs to be built in order to run it standalone.
 |      Framework-specific plugins handle this step automatically.
 |
 |  define_graph(self)
 |      This function is defined by the user to construct the
 |      graph of operations for their pipeline.
 |
 |      It returns a list of outputs created by calling DALI Operators.
 |
 |  deserialize_and_build(self, serialized_pipeline)
 |      Deserialize and build the pipeline given in serialized form.
 |
 |      Parameters
 |      ----------
 |      serialized_pipeline : str
 |                            Serialized pipeline.
 |
 |  epoch_size(self, name=None)
 |      Epoch size of a pipeline.
 |
 |      If the `name` parameter is `None`, returns a dictionary of pairs
 |      `(reader name, epoch size for that reader)`.
 |      If the `name` parameter is not `None`, returns epoch size for that
 |      reader.
 |
 |      Parameters
 |      ----------
 |      name : str, optional, default = None
 |          The reader which should be used to obtain epoch size.
 |
 |  feed_input(self, ref, data, layout=TensorLayout.NHWC)
 |      Bind the NumPy array to a tensor produced by ExternalSource
 |      operator. It is worth mentioning that `ref` should not be overriden
 |      with other operator outputs.
 |
 |  iter_setup(self)
 |      This function can be overriden by user-defined
 |      pipeline to perform any needed setup for each iteration.
 |      For example, one can use this function to feed the input
 |      data from NumPy arrays.
 |
 |  outputs(self)
 |      Returns the outputs of the pipeline and releases previous buffer.
 |
 |      If the pipeline is executed asynchronously, this function blocks
 |      until the results become available. It rises StopIteration if data set
 |      reached its end - usually when iter_setup cannot produce any more data
 |
 |  reset(self)
 |      Resets pipeline iterator
 |
 |      If pipeline iterator reached the end then reset its state to the beginning.
 |
 |  run(self)
 |      Run the pipeline and return the result.
 |
 |      If the pipeline was created with `exec_pipelined` option set to `True`,
 |      this function will also start prefetching the next iteration for
 |      faster execution.
 |
 |  save_graph_to_dot_file(self, filename)
 |      Saves the pipeline graph to a file.
 |
 |      Parameters
 |      ----------
 |      filename : str
 |                 Name of the file to which the graph is written.
 |
 |  serialize(self)
 |      Serialize the pipeline to a Protobuf string.
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |
 |  __dict__
 |      dictionary for instance variables (if defined)
 |
 |  __weakref__
 |      list of weak references to the object (if defined)
 |
 |  batch_size
 |      Batch size.
 |
 |  device_id
 |      Id of the GPU used by the pipeline.
 |
 |  num_threads
 |      Number of CPU threads used by the pipeline.

Defining the pipeline

Let us start with defining a very simple pipeline for classifier determining whether a picture contains dog or a cat.

We prepared a directory structure containing picture of dogs and cats:

[2]:
import os.path
import fnmatch

for root, dir, files in os.walk("data/images"):
        depth = root.count('/')
        ret = ""
        if depth > 0:
            ret += "  " * (depth - 1) + "|-"
        print (ret + root)
        for items in fnmatch.filter(files, "*"):
                print (" " * len(ret) + "|-" + items)
images
|-file_list.txt
|-images/dog
  |-dog_4.jpg
  |-dog_5.jpg
  |-dog_9.jpg
  |-dog_6.jpg
  |-dog_3.jpg
  |-dog_7.jpg
  |-dog_10.jpg
  |-dog_2.jpg
  |-dog_8.jpg
  |-dog_1.jpg
  |-dog_11.jpg
|-images/kitten
  |-cat_10.jpg
  |-cat_5.jpg
  |-cat_9.jpg
  |-cat_8.jpg
  |-cat_1.jpg
  |-cat_7.jpg
  |-cat_6.jpg
  |-cat_3.jpg
  |-cat_2.jpg
  |-cat_4.jpg

Our simple pipeline will read images from this directory, decode them and return (image, label) pairs.

[3]:
import nvidia.dali.ops as ops
import nvidia.dali.types as types

image_dir = "data/images"
batch_size = 8

class SimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(SimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir)
        # instead of path to file directory file with pairs image_name image_label_value can be provided
        # self.input = ops.FileReader(file_root = image_dir, file_list = image_dir + '/file_list.txt')
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

The SimplePipeline class is a subclass of dali.pipeline.Pipeline, which provides most of the methods to create and launch a pipeline. The only 2 methods that we need to implement is the constructor and define_graph function.

In the constructor we first call our superclass constructor, in order to set global parameters of the pipeline:

  • batch size,

  • number of threads used to perform computation on the CPU,

  • which GPU device to use (SimplePipeline does not yet use GPU for compute though)

  • seed for random number generation

In the constructor we also define member variables of our SimplePipeline class as operations defined in dali.ops module:

  • FileReader - traverses the directory and returns pairs of (encoded image, label)

  • ImageDecoder - takes an encoded image input and outputs decoded RGB image

In the define_graph function we define the actual flow of computation:

jpegs, labels = self.input()

uses our input operation to create jpegs (encoded images) and labels.

images = self.decode(jpegs)

Next, we use the decode operation to create images (decoded RGB images).

return (images, labels)

Finally, we specify which of the intermediate variables should be returned as outputs of the pipeline.

Building the pipeline

In order to use our SimplePipeline, we need to build it. This is achieved by calling build function.

[4]:
pipe = SimplePipeline(batch_size, 1, 0)
pipe.build()

Running the pipeline

After the pipeline is built, we can run it to get the batch of results.

[5]:
pipe_out = pipe.run()
print(pipe_out)
[<nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf180>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff6080bf5e0>]

The output of the pipeline, which we saved to pipe_out variable, is a list of 2 elements (as expected - we specified 2 outputs in define_graph function in SimplePipeline class). Both of these elements are TensorListCPU objects - each contains a list of tensors on the CPU.

In order to show the results (just for debugging purposes - during the actual training we would not do that step, as it would make our batch of images do a round trip from GPU to CPU and back) we can send our data from DALI’s Tensor to NumPy array. Not every TensorList can be accessed that way though - TensorList is more general than NumPy array and can hold tensors with different shapes. In order to check whether we can send it to NumPy directly, we can call the is_dense_tensor function of TensorList

[6]:
images, labels = pipe_out
print("Images is_dense_tensor: " + str(images.is_dense_tensor()))
print("Labels is_dense_tensor: " + str(labels.is_dense_tensor()))
Images is_dense_tensor: False
Labels is_dense_tensor: True

As it turns out, TensorList containing labels can be represented by a tensor, while the TensorList containing images cannot.

Let us see, what is the shape and contents of returned labels.

[7]:
import numpy as np

labels_tensor = labels.as_tensor()

print (labels_tensor.shape())
print (np.array(labels_tensor))
[8L, 1L]
[[0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]
 [0]]

In order to see the images, we will need to loop over all tensors contained in TensorList, accessed with its at method.

[8]:
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
%matplotlib inline

def show_images(image_batch):
    columns = 4
    rows = (batch_size + 1) // (columns)
    fig = plt.figure(figsize = (32,(32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows*columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(image_batch.at(j))
[9]:
show_images(images)
../_images/examples_getting_started_19_0.png

Adding augmentations

Random shuffle

As we can see from the example above, the first batch of images returned by our pipeline contains only dogs. That is because we did not shuffle our dataset, and so FileReader returns images in order in which it encountered them while traversing the directory structure.

Let us make a new pipeline, that will change that.

[10]:
class ShuffledSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ShuffledSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        return (images, labels)

We made 2 changes to the SimplePipeline to obtain ShuffledSimplePipeline - we added 2 parameters to the FileReader operation

  • random_shuffle enables shuffling of images in the reader. Shuffling is performed using a buffer of images read from disk. When reader is asked to provide a next image, it randomly selects an image from the buffer, outputs it and immediately replaces that spot in a buffer with a freshly read image.

  • initial_fill sets the capacity of the buffer. The default value of this parameter (1000), well suited for datasets containing thousands of examples, is too big for our very small dataset, which contains only 21 images. This could result in frequent duplicates in the returned batch. That is why in this example we set it to the size of our dataset.

Let us test the result of this modification.

[11]:
pipe = ShuffledSimplePipeline(batch_size, 1, 0)
pipe.build()
[12]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
../_images/examples_getting_started_24_0.png

Now the images returned by the pipeline are shuffled properly.

Augmentations

DALI can not only read images from disk and batch them into tensors, it can also perform various augmentations on those images to improve Deep Learning training results.

One example of such augmentations is rotation. Let us make a new pipeline, which rotates the images before outputting them.

[13]:
class RotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(angle = 10.0)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        rotated_images = self.rotate(images)
        return (rotated_images, labels)

To do that, we added a new operation to our pipeline: dali.ops.Rotate. To obtain information on required and optional arguments of any operation provided by DALI, we can use help function.

[14]:
help(ops.Rotate)
Help on class Rotate in module nvidia.dali.ops:

class Rotate(__builtin__.object)
 |  .. _Rotate:
 |
 |  This is a 'CPU', 'GPU' operator
 |
 |  Rotate the image.
 |
 |  Parameters
 |  ----------
 |  `angle` : float or float tensor
 |            Rotation angle.
 |  `bytes_per_sample_hint` : int, optional, default = 0
 |                            Output size hint (bytes), per sample. The memory will be preallocated if it uses GPU or page-locked memory
 |  `fill_value` : float, optional, default = 0.0
 |                 Color value used for padding pixels.
 |  `interp_type` : nvidia.dali.types.DALIInterpType, optional, default = DALIInterpType.INTERP_NN
 |                  Type of interpolation used.
 |  `mask` : int or int tensor, optional, default = 1
 |           Whether to apply this augmentation to the input image.
 |
 |           * 0 - do not apply this transformation
 |           * 1 - apply this transformation
 |
 |  `seed` : int, optional, default = -1
 |           Random seed (If not provided it will be populated based on the global seed of the pipeline)
 |
 |  Methods defined here:
 |
 |  __call__(self, *inputs, **kwargs)
 |
 |  __init__(self, **kwargs)
 |
 |  ----------------------------------------------------------------------
 |  Data descriptors defined here:
 |
 |  __dict__
 |      dictionary for instance variables (if defined)
 |
 |  __weakref__
 |      list of weak references to the object (if defined)
 |
 |  device
 |
 |  schema
 |
 |  spec

As we can see, Rotate can take multiple arguments, but only one of them, angle, is required - it tells the operator how much it should rotate images.

Let us test the newly created pipeline:

[15]:
pipe = RotatedSimplePipeline(batch_size, 1, 0)
pipe.build()
[16]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
../_images/examples_getting_started_31_0.png

Tensors as arguments and Random Number Generation

Rotating every image by 10 degrees is not that interesting. To make a meaningful augmentation, we would like an operator that rotates our images by a random angle in a given range.

The help output for Rotate operation tells us, that angle parameter can accept float or float tensor types of values. The second option, float tensor, enables us to feed the operator with different rotation angles for every image, via a tensor produced by other support operation.

Random number generators are examples of support operations that one can use with DALI. Let us use dali.ops.Uniform to make a pipeline with random rotation.

[17]:
class RandomRotatedSimplePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedSimplePipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate()
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images, angle = angle)
        return (rotated_images, labels)

This time, instead of providing a fixed value for the angle argument in the constructor, we set it to the output of the dali.ops.Uniform operator.

Let us check the result:

[18]:
pipe = RandomRotatedSimplePipeline(batch_size, 1, 0)
pipe.build()
[19]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images)
../_images/examples_getting_started_36_0.png

This time, the rotation angle is randomly selected from a value range.

GPU acceleration

DALI offers access to GPU accelerated operators, that can increase the speed of the input and augmentation pipeline and let it scale to multi-GPU systems.

Copying tensors to GPU

Let us modify the previous example of the RandomRotatedSimplePipeline to use GPU for rotation operator.

[20]:
class RandomRotatedGPUPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(RandomRotatedGPUPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = 'cpu', output_type = types.RGB)
        self.rotate = ops.Rotate(device = "gpu")
        self.rng = ops.Uniform(range = (-10.0, 10.0))

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        angle = self.rng()
        rotated_images = self.rotate(images.gpu(), angle = angle)
        return (rotated_images, labels)

In order to tell DALI that we want to use GPU, we needed to make 2 changes to the pipeline:

  • we added a device = "gpu" parameter to the Rotate operation

  • we changed input to the rotate from images, which is a tensor on the CPU, to images.gpu() which copies it to the GPU

[21]:
pipe = RandomRotatedGPUPipeline(batch_size, 1, 0)
pipe.build()
[22]:
pipe_out = pipe.run()
print(pipe_out)
[<nvidia.dali.backend_impl.TensorListGPU object at 0x7ff565380ed8>, <nvidia.dali.backend_impl.TensorListCPU object at 0x7ff564060148>]

pipe_out still contains 2 TensorLists, but this time the first output, result of the Rotate operation, is on the GPU. We cannot access contents of TensorListGPU directly from the CPU, so in order to visualize the result we need to copy it to the CPU by using as_cpu method.

[23]:
images, labels = pipe_out
show_images(images.as_cpu())
../_images/examples_getting_started_43_0.png

Important notice

DALI does not support moving the data from the GPU to the CPU within the pipeline. In all execution paths CPU operator cannot follow GPU operator.

Hybrid decoding

Sometimes, especially for higher resolution images, decoding images stored in JPEG format may become a bottleneck. To address this problem, nvJPEG library was developed. It splits the decoding process between CPU and GPU, significantly reducing the decoding time.

Specifying “mixed” device parameter in ImageDecoder enables nvJPEG support. Other file formats are still decoded on the CPU.

[24]:
class HybridPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(HybridPipeline, self).__init__(batch_size, num_threads, device_id, seed = 12)
        self.input = ops.FileReader(file_root = image_dir, random_shuffle = True, initial_fill = 21)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)

    def define_graph(self):
        jpegs, labels = self.input()
        images = self.decode(jpegs)
        # images are on the GPU
        return (images, labels)

ImageDecoder with device=mixed uses a hybrid approach of computation that employs both the CPU and the GPU. This means that it accepts CPU inputs, but returns GPU outputs. That is why images objects returned from the pipeline are of type TensorListGPU.

[25]:
pipe = HybridPipeline(batch_size, 1, 0)
pipe.build()
[26]:
pipe_out = pipe.run()
images, labels = pipe_out
show_images(images.as_cpu())
../_images/examples_getting_started_49_0.png

Let us compare the speed of ImageDecoder for ‘cpu’ and ‘mixed’ backends by measuring speed of ShuffledSimplePipeline and HybridPipeline with 4 CPU threads.

[27]:
from timeit import default_timer as timer

test_batch_size = 64

def speedtest(pipeclass, batch, n_threads):
    pipe = pipeclass(batch, n_threads, 0)
    pipe.build()
    # warmup
    for i in range(5):
        pipe.run()
    # test
    n_test = 20
    t_start = timer()
    for i in range(n_test):
        pipe.run()
    t = timer() - t_start
    print("Speed: {} imgs/s".format((n_test * batch)/t))
[28]:
speedtest(ShuffledSimplePipeline, test_batch_size, 4)
Speed: 2905.71010277 imgs/s
[29]:
speedtest(HybridPipeline, test_batch_size, 4)
Speed: 5714.61475087 imgs/s