Using Tensorflow DALI plugin: simple example

Overview

Using our DALI data loading and augmentation pipeline with Tensorflow is pretty simple.

We can summarize the integration in 3 steps :

  • Defining a DALI Pipeline

  • Give the pipeline to DALIIterator op

  • Instatiate the op in TensorFlow graph and use it

Defining the Data Loading Pipeline

First we start by defining some parameters for DALI and Tensorflow.

In this tutorial, we will use a subsample of Imagenet stored in an MXNet’s RecordIO. For details on how to use MXNetReader, as well as other readers, please see other examples.

DALI_EXTRA_PATH environment variable should point to the place where data from DALI extra repository is downloaded. Please make sure that the proper release tag is checked out.

[1]:
import os.path

test_data_root = os.environ['DALI_EXTRA_PATH']

# MXNet RecordIO
base = os.path.join(test_data_root, 'db', 'recordio')

idx_files = [base + "/train.idx"]
rec_files = [base + "/train.rec"]

BATCH_SIZE = 32
DEVICES = 1
ITERATIONS = 32
BURNIN_STEPS = 16

In order to use DALI we need those 3 imports:

[2]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

Then we can define our pipeline by declaring RN50Pipeline as as sublass of dali.pipeline.Pipeline. We declare the operators the pipeline will need in the constructor. Then we define the graph in define_graph.

This is the usual DALI Pipeline creation. For more information about DALI Pipeline, please take a look at Getting Started notebook.

Note again that we are using MXNetReader that reads MXNet’s dataset format RecordIO. You can change it to other Reader ops to use any of the supported dataset format.

[3]:
class RN50Pipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(RN50Pipeline, self).__init__(batch_size,
                                         num_threads,
                                         device_id)
        self.input = ops.MXNetReader(path = rec_files, index_path = idx_files,
                                     shard_id = device_id, num_shards = num_gpus)

        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu",
                                 interp_type = types.INTERP_LINEAR)
        self.cmn = ops.CropMirrorNormalize(device = "gpu",
                                            dtype = types.FLOAT,
                                            crop = (224, 224),
                                            mean = [128., 128., 128.],
                                            std = [1., 1., 1.])
        self.res_uniform = ops.random.Uniform(range = (256.,480.))
        self.uniform = ops.random.Uniform(range = (0.0, 1.0))
        self.cast = ops.Cast(device = "gpu",
                             dtype = types.INT32)

    def define_graph(self):
        inputs, labels = self.input(name="Reader")
        images = self.decode(inputs)
        images = self.resize(images, resize_shorter = self.res_uniform())
        output = self.cmn(images, crop_pos_x = self.uniform(),
                          crop_pos_y = self.uniform())
        output = self.cast(output)
        return (output, labels.gpu())

Next, we instatiate the pipelines with the right parameters. We will create one pipeline per GPU, by specifying the right device_id for each pipeline.

The difference is that instead of calling pipeline.build and using it, we will pass the pipeline object to the TensorFlow operator.

[4]:
pipes = [RN50Pipeline(batch_size=BATCH_SIZE, num_threads=2, device_id = device_id, num_gpus = DEVICES) for device_id in range(DEVICES)]

Using DALI TensorFlow Plugin

Let’s start by importing Tensorflow and the DALI Tensorflow plugin as dali_tf.

[5]:
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
import time
try:
    from tensorflow.compat.v1 import GPUOptions
    from tensorflow.compat.v1 import ConfigProto
    from tensorflow.compat.v1 import Session
    from tensorflow.compat.v1 import placeholder
except:
    # Older TF versions don't have compat.v1 layer
    from tensorflow import GPUOptions
    from tensorflow import ConfigProto
    from tensorflow import Session
    from tensorflow import placeholder

try:
    tf.compat.v1.disable_eager_execution()
except:
    pass

We can now use nvidia.dali.plugin.tf.DALIIterator() method to get the Tensorflow Op that will produce the tensors we will use in the Tensorflow graph.

For each DALI pipeline, we use daliop that returns a Tensorflow tensor tuple that we will store in image, label. Each one is using a different tf.device.

[6]:
daliop = dali_tf.DALIIterator()

images = []
labels = []
for d in range(DEVICES):
    with tf.device('/gpu:%i' % d):
        image, label = daliop(pipeline = pipes[d],
            shapes = [(BATCH_SIZE, 3, 224, 224), ()],
            dtypes = [tf.int32, tf.float32])

        images.append(image)
        labels.append(label)

Using the Tensors in a Simple Tensorflow Graph

We will use images and labels tensors list in our Tensorflow graph definition. Then run a very simple one op graph session that will output the batch of images and labels.

[7]:
gpu_options = GPUOptions(per_process_gpu_memory_fraction=0.8)
config = ConfigProto(gpu_options=gpu_options)

with Session(config=config) as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE * DEVICES

    for i in range(ITERATIONS):
        start_time = time.time()

        # The actual run with our dali_tf tensors
        res = sess.run([images, labels])

        elapsed_time = time.time() - start_time
        img_per_sec = total_batch_size / elapsed_time
        if i > BURNIN_STEPS:
            all_img_per_sec.append(img_per_sec)
            print("\t%7.1f img/s" %  img_per_sec)

    print("Total average %7.1f img/s" % (sum(all_img_per_sec) / len(all_img_per_sec)))
          521.1 img/s
          478.2 img/s
          542.1 img/s
          506.1 img/s
          495.0 img/s
          543.4 img/s
          533.8 img/s
          526.2 img/s
          541.0 img/s
          576.3 img/s
          515.8 img/s
          541.7 img/s
          548.0 img/s
          515.5 img/s
          561.4 img/s
Total average   529.7 img/s

Let us check the output images with their augmentations! Tensorflow outputs numpy arrays, so we can visualize them easily with matplotlib.

We define a show_images helper function that will display a sample of our batch.

The batch layout is NCHW so we use transpose to get HWC images, that matplotlib can show.

[8]:
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
%matplotlib inline

def show_images(image_batch, nb_images):
    columns = 4
    rows = (nb_images + 1) // (columns)
    fig = plt.figure(figsize = (32,(32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(nb_images):
        plt.subplot(gs[j])
        plt.axis("off")
        img = image_batch[0][j].transpose((1,2,0)) + 128
        plt.imshow(img.astype('uint8'))
show_images(res[0], 8)
../../../_images/examples_frameworks_tensorflow_tensorflow-plugin-tutorial_16_0.png

Let us create the same operator for the CPU:

[9]:
class RN50CPUPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(RN50CPUPipeline, self).__init__(batch_size,
                                         num_threads,
                                         device_id)
        self.input = ops.MXNetReader(path = rec_files, index_path = idx_files,
                                     shard_id = device_id, num_shards = num_gpus)

        self.decode = ops.ImageDecoder(device = "cpu", output_type = types.RGB)
        self.resize = ops.Resize(device = "cpu",
                                 interp_type = types.INTERP_LINEAR)
        self.cmn = ops.CropMirrorNormalize(device = "cpu",
                                            dtype = types.FLOAT,
                                            crop = (224, 224),
                                            mean = [128., 128., 128.],
                                            std = [1., 1., 1.])
        self.res_uniform = ops.random.Uniform(range = (256.,480.))
        self.uniform = ops.random.Uniform(range = (0.0, 1.0))
        self.cast = ops.Cast(device = "cpu",
                             dtype = types.INT32)

    def define_graph(self):
        inputs, labels = self.input(name="Reader")
        images = self.decode(inputs)
        images = self.resize(images, resize_shorter = self.res_uniform())
        output = self.cmn(images, crop_pos_x = self.uniform(),
                          crop_pos_y = self.uniform())
        output = self.cast(output)
        return (output, labels)

cpu_pipes = [RN50CPUPipeline(batch_size=BATCH_SIZE, num_threads=4, device_id = device_id, num_gpus = DEVICES) for device_id in range(DEVICES)]

daliop = dali_tf.DALIIterator()

images_cpu = []
labels_cpu = []
for d in range(DEVICES):
    with tf.device('/cpu'):
        image_cpu, label_cpu = daliop(pipeline = cpu_pipes[d],
            shapes = [(BATCH_SIZE, 3, 224, 224), ()],
            dtypes = [tf.int32, tf.float32])

        images_cpu.append(image_cpu)
        labels_cpu.append(label_cpu)

with Session() as sess:
    all_img_per_sec = []
    total_batch_size = BATCH_SIZE * DEVICES

    for i in range(ITERATIONS):
        start_time = time.time()

        # The actual run with our dali_tf tensors
        res_cpu = sess.run([images_cpu, labels_cpu])

        elapsed_time = time.time() - start_time
        img_per_sec = total_batch_size / elapsed_time
        if i > BURNIN_STEPS:
            all_img_per_sec.append(img_per_sec)
            print("\t%7.1f img/s" %  img_per_sec)

    print("Total average %7.1f img/s" % (sum(all_img_per_sec) / len(all_img_per_sec)))
show_images(res_cpu[0], 8)
          288.6 img/s
          316.2 img/s
          368.7 img/s
          292.6 img/s
          327.4 img/s
          310.1 img/s
          336.3 img/s
          338.3 img/s
          324.0 img/s
          332.9 img/s
          287.4 img/s
          373.8 img/s
          332.6 img/s
          266.5 img/s
          389.8 img/s
Total average   325.7 img/s
../../../_images/examples_frameworks_tensorflow_tensorflow-plugin-tutorial_18_1.png