Using Tensorflow DALI plugin: using various readers

Overview

This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.

Following readers are used in this example:

  • MXNetReader
  • FileReader
  • TFRecordReader

For details on how to use them please see other examples.

Lets start from defining some global constants

In [1]:
# MXNet RecordIO
db_folder = "/data/imagenet/train-480-val-256-recordio/"

# image dir with plain jpeg files
image_dir = "../images"

# TFRecord
tfrecord = "/data/imagenet/train-val-tfrecord-480/train-00001-of-01024"
tfrecord_idx = "idx_files/train-00001-of-01024.idx"
tfrecord2idx_script = "tfrecord2idx"

N = 4             # number of GPUs
BATCH_SIZE = 128  # batch size per GPU
ITERATIONS = 32
IMAGE_SIZE = 3

Create idx file by calling tfrecord2idx script

In [2]:
from subprocess import call
import os.path

if not os.path.exists("idx_files"):
    os.mkdir("idx_files")

if not os.path.isfile(tfrecord_idx):
    call([tfrecord2idx_script, tfrecord, tfrecord_idx])

Let us define: - common part of pipeline, other pipelines will inherit it

In [3]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class CommonPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(CommonPipeline, self).__init__(batch_size, num_threads, device_id)

        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu",
                                 image_type = types.RGB,
                                 interp_type = types.INTERP_LINEAR)
        self.cmn = ops.CropMirrorNormalize(device = "gpu",
                                            output_dtype = types.FLOAT,
                                            crop = (227, 227),
                                            image_type = types.RGB,
                                            mean = [128., 128., 128.],
                                            std = [1., 1., 1.])
        self.uniform = ops.Uniform(range = (0.0, 1.0))
        self.resize_rng = ops.Uniform(range = (256, 480))

    def base_define_graph(self, inputs, labels):
        images = self.decode(inputs)
        images = self.resize(images, resize_shorter = self.resize_rng())
        output = self.cmn(images, crop_pos_x = self.uniform(),
                          crop_pos_y = self.uniform())
        return (output, labels.gpu())
  • MXNetReaderPipeline
In [4]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class MXNetReaderPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(MXNetReaderPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.MXNetReader(path = [db_folder+"train.rec"], index_path=[db_folder+"train.idx"],
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

    def define_graph(self):
        images, labels = self.input(name="Reader")
        return self.base_define_graph(images, labels)
  • FileReadPipeline
In [5]:
class FileReadPipeline(CommonPipeline):
        def __init__(self, batch_size, num_threads, device_id, num_gpus):
            super(FileReadPipeline, self).__init__(batch_size, num_threads, device_id)
            self.input = ops.FileReader(file_root = image_dir)

        def define_graph(self):
            images, labels = self.input()
            return self.base_define_graph(images, labels)
  • TFRecordPipeline
In [6]:
import nvidia.dali.tfrecord as tfrec

class TFRecordPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(TFRecordPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.TFRecordReader(path = tfrecord,
                                        index_path = tfrecord_idx,
                                        features = {"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
                                                    "image/class/label": tfrec.FixedLenFeature([1], tfrec.int64,  -1)
                                        })

    def define_graph(self):
        inputs = self.input()
        images = inputs["image/encoded"]
        labels = inputs["image/class/label"]
        return self.base_define_graph(images, labels)

Now let us create function which build serialized pipeline on demand:

In [7]:
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf

def get_batch_test_dali(batch_size, pipe_type):
    pipe_name, label_type, _ = pipe_type
    pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]

    serialized_pipes = [pipe.serialize() for pipe in pipes]
    del pipes
    daliop = dali_tf.DALIIterator()
    images = []
    labels = []
    for d in range(N):
        with tf.device('/gpu:%i' % d):
            image, label = daliop(serialized_pipeline = serialized_pipes[d],
                shape = [BATCH_SIZE, 3, 227, 227],
                image_type = tf.int32,
                label_type = label_type,
                device_id = d)
            images.append(image)
            labels.append(label)

    return [images, labels]

At the end let us test if all pipelines could be corectly build, serialized and run with TF session

In [8]:
import numpy as np

pipe_types = [[MXNetReaderPipeline, tf.float32, (0, 999)],
              [FileReadPipeline, tf.int32, (0, 1)],
              [TFRecordPipeline, tf.int64, (1, 1000)]]
for pipe_name in pipe_types:
    print ("RUN: "  + pipe_name[0].__name__)
    test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
    x = tf.placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
    config = tf.ConfigProto(gpu_options=gpu_options)

    with tf.Session(config=config) as sess:
        for i in range(ITERATIONS):
            imgs, labels = sess.run(test_batch)
            # Testing correctness of labels
            for label in labels:
                ## labels need to be integers
                assert(np.equal(np.mod(label, 1), 0).all())
                ## labels need to be in range pipe_name[2]
                assert((label >= pipe_name[2][0]).all())
                assert((label <= pipe_name[2][1]).all())
    print("OK : " + pipe_name[0].__name__)
RUN: MXNetReaderPipeline
OK : MXNetReaderPipeline
RUN: FileReadPipeline
OK : FileReadPipeline
RUN: TFRecordPipeline
OK : TFRecordPipeline