Using Tensorflow DALI plugin: using various readers

Overview

This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.

The following readers are used in this example:

  • MXNetReader
  • CaffeReader
  • FileReader
  • TFRecordReader

For details on how to use them please see other examples.

Lets start from defining some global constants

In [1]:
# MXNet RecordIO
db_folder = "/data/imagenet/train-480-val-256-recordio/"

# Caffe LMDB
lmdb_folder = "/data/imagenet/train-lmdb-256x256"

# image dir with plain jpeg files
image_dir = "../images"

# TFRecord
tfrecord = "/data/imagenet/train-val-tfrecord-480/train-00001-of-01024"
tfrecord_idx = "idx_files/train-00001-of-01024.idx"
tfrecord2idx_script = "tfrecord2idx"

N = 4             # number of GPUs
BATCH_SIZE = 128  # batch size per GPU
ITERATIONS = 32
IMAGE_SIZE = 3

Create idx file by calling tfrecord2idx script

In [2]:
from subprocess import call
import os.path

if not os.path.exists("idx_files"):
    os.mkdir("idx_files")

if not os.path.isfile(tfrecord_idx):
    call([tfrecord2idx_script, tfrecord, tfrecord_idx])

Let us define: - common part of pipeline, other pipelines will inherit it

In [3]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class CommonPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(CommonPipeline, self).__init__(batch_size, num_threads, device_id)

        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu",
                                 image_type = types.RGB,
                                 interp_type = types.INTERP_LINEAR)
        self.cmn = ops.CropMirrorNormalize(device = "gpu",
                                            output_dtype = types.FLOAT,
                                            crop = (227, 227),
                                            image_type = types.RGB,
                                            mean = [128., 128., 128.],
                                            std = [1., 1., 1.])
        self.uniform = ops.Uniform(range = (0.0, 1.0))
        self.resize_rng = ops.Uniform(range = (256, 480))

    def base_define_graph(self, inputs, labels):
        images = self.decode(inputs)
        images = self.resize(images, resize_shorter = self.resize_rng())
        output = self.cmn(images, crop_pos_x = self.uniform(),
                          crop_pos_y = self.uniform())
        return (output, labels.gpu())
  • MXNetReaderPipeline
In [4]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class MXNetReaderPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(MXNetReaderPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.MXNetReader(path = [db_folder+"train.rec"], index_path=[db_folder+"train.idx"],
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

    def define_graph(self):
        images, labels = self.input(name="Reader")
        return self.base_define_graph(images, labels)
  • CaffeReadPipeline
In [5]:
class CaffeReadPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(CaffeReadPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.CaffeReader(path = lmdb_folder,
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

    def define_graph(self):
        images, labels = self.input()
        return self.base_define_graph(images, labels)
  • FileReadPipeline
In [6]:
class FileReadPipeline(CommonPipeline):
        def __init__(self, batch_size, num_threads, device_id, num_gpus):
            super(FileReadPipeline, self).__init__(batch_size, num_threads, device_id)
            self.input = ops.FileReader(file_root = image_dir)

        def define_graph(self):
            images, labels = self.input()
            return self.base_define_graph(images, labels)
  • TFRecordPipeline
In [7]:
import nvidia.dali.tfrecord as tfrec

class TFRecordPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(TFRecordPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.TFRecordReader(path = tfrecord,
                                        index_path = tfrecord_idx,
                                        features = {"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
                                                    "image/class/label": tfrec.FixedLenFeature([1], tfrec.int64,  -1)
                                        })

    def define_graph(self):
        inputs = self.input()
        images = inputs["image/encoded"]
        labels = inputs["image/class/label"]
        return self.base_define_graph(images, labels)

Now let us create function which build serialized pipeline on demand:

In [8]:
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf

def get_batch_test_dali(batch_size, pipe_type):
    pipe_name, label_type, _ = pipe_type
    pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]

    serialized_pipes = [pipe.serialize() for pipe in pipes]
    del pipes
    daliop = dali_tf.DALIIterator()
    images = []
    labels = []
    for d in range(N):
        with tf.device('/gpu:%i' % d):
            image, label = daliop(serialized_pipeline = serialized_pipes[d],
                shape = [BATCH_SIZE, 3, 227, 227],
                image_type = tf.int32,
                label_type = label_type,
                device_id = d)
            images.append(image)
            labels.append(label)

    return [images, labels]

At the end let us test if all pipelines could be corectly build, serialized and run with TF session

In [9]:
import numpy as np

pipe_types = [[MXNetReaderPipeline, tf.float32, (0, 999)],
              [CaffeReadPipeline, tf.int32, (0, 999)],
              [FileReadPipeline, tf.int32, (0, 1)],
              [TFRecordPipeline, tf.int64, (1, 1000)]]
for pipe_name in pipe_types:
    print ("RUN: "  + pipe_name[0].__name__)
    test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
    x = tf.placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
    config = tf.ConfigProto(gpu_options=gpu_options)

    with tf.Session(config=config) as sess:
        for i in range(ITERATIONS):
            imgs, labels = sess.run(test_batch)
            # Testing correctness of labels
            for label in labels:
                ## labels need to be integers
                assert(np.equal(np.mod(label, 1), 0).all())
                ## labels need to be in range pipe_name[2]
                assert((label >= pipe_name[2][0]).all())
                assert((label <= pipe_name[2][1]).all())
    print("OK : " + pipe_name[0].__name__)
RUN: MXNetReaderPipeline
OK : MXNetReaderPipeline
RUN: CaffeReadPipeline
OK : CaffeReadPipeline
RUN: FileReadPipeline
OK : FileReadPipeline
RUN: TFRecordPipeline
OK : TFRecordPipeline