Using Tensorflow DALI plugin: using various readers


This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.

The following readers are used in this example:

  • MXNetReader

  • CaffeReader

  • FileReader

  • TFRecordReader

For details on how to use them please see other examples.

Let us start with defining some global constants

DALI_EXTRA_PATH environment variable should point to the place where data from DALI extra repository is downloaded. Please make sure that the proper release tag is checked out.

import os.path

test_data_root = os.environ['DALI_EXTRA_PATH']

# MXNet RecordIO
db_folder = os.path.join(test_data_root, 'db', 'recordio/')

# Caffe LMDB
lmdb_folder = os.path.join(test_data_root, 'db', 'lmdb')

# image dir with plain jpeg files
image_dir = "../../data/images"

# TFRecord
tfrecord = os.path.join(test_data_root, 'db', 'tfrecord', 'train')
tfrecord_idx = "idx_files/train.idx"
tfrecord2idx_script = "tfrecord2idx"

N = 8             # number of GPUs
BATCH_SIZE = 128  # batch size per GPU

Create idx file by calling tfrecord2idx script

from subprocess import call
import os.path

if not os.path.exists("idx_files"):

if not os.path.isfile(tfrecord_idx):
    call([tfrecord2idx_script, tfrecord, tfrecord_idx])

Let us define: - common part of pipeline, other pipelines will inherit it

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class CommonPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(CommonPipeline, self).__init__(batch_size, num_threads, device_id)

        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
        self.resize = ops.Resize(device = "gpu",
                                 interp_type = types.INTERP_LINEAR)
        self.cmn = ops.CropMirrorNormalize(device = "gpu",
                                            dtype = types.FLOAT,
                                            crop = (227, 227),
                                            mean = [128., 128., 128.],
                                            std = [1., 1., 1.])
        self.uniform = ops.Uniform(range = (0.0, 1.0))
        self.resize_rng = ops.Uniform(range = (256, 480))

    def base_define_graph(self, inputs, labels):
        images = self.decode(inputs)
        images = self.resize(images, resize_shorter = self.resize_rng())
        output = self.cmn(images, crop_pos_x = self.uniform(),
                          crop_pos_y = self.uniform())
        return (output, labels.gpu())
  • MXNetReaderPipeline

from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

class MXNetReaderPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(MXNetReaderPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.MXNetReader(path = [db_folder+"train.rec"], index_path=[db_folder+"train.idx"],
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

    def define_graph(self):
        images, labels = self.input(name="Reader")
        return self.base_define_graph(images, labels)
  • CaffeReadPipeline

class CaffeReadPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(CaffeReadPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.CaffeReader(path = lmdb_folder,
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)

    def define_graph(self):
        images, labels = self.input()
        return self.base_define_graph(images, labels)
  • FileReadPipeline

class FileReadPipeline(CommonPipeline):
        def __init__(self, batch_size, num_threads, device_id, num_gpus):
            super(FileReadPipeline, self).__init__(batch_size, num_threads, device_id)
            self.input = ops.FileReader(file_root = image_dir)

        def define_graph(self):
            images, labels = self.input()
            return self.base_define_graph(images, labels)
  • TFRecordPipeline

import nvidia.dali.tfrecord as tfrec

class TFRecordPipeline(CommonPipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(TFRecordPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.TFRecordReader(path = tfrecord,
                                        index_path = tfrecord_idx,
                                        features = {"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
                                                    "image/class/label": tfrec.FixedLenFeature([1], tfrec.int64,  -1)

    def define_graph(self):
        inputs = self.input()
        images = inputs["image/encoded"]
        labels = inputs["image/class/label"]
        return self.base_define_graph(images, labels)

Now let us create function which builds pipeline on demand:

import tensorflow as tf
import as dali_tf

    from tensorflow.compat.v1 import GPUOptions
    from tensorflow.compat.v1 import ConfigProto
    from tensorflow.compat.v1 import Session
    from tensorflow.compat.v1 import placeholder
    # Older TF versions don't have compat.v1 layer
    from tensorflow import GPUOptions
    from tensorflow import ConfigProto
    from tensorflow import Session
    from tensorflow import placeholder


def get_batch_test_dali(batch_size, pipe_type):
    pipe_name, label_type, _ = pipe_type
    pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]

    daliop = dali_tf.DALIIterator()
    images = []
    labels = []
    for d in range(N):
        with tf.device('/gpu:%i' % d):
            image, label = daliop(pipeline = pipes[d],
                shapes = [(BATCH_SIZE, 3, 227, 227), ()],
                dtypes = [tf.int32, label_type],
                device_id = d)

    return [images, labels]

At the end let us test if all pipelines have been correctly built and run with TF session

import numpy as np

pipe_types = [[MXNetReaderPipeline, tf.float32, (0, 999)],
              [CaffeReadPipeline, tf.int32, (0, 999)],
              [FileReadPipeline, tf.int32, (0, 1)],
              [TFRecordPipeline, tf.int64, (1, 1000)]]
for pipe_name in pipe_types:
    print ("RUN: "  + pipe_name[0].__name__)
    test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
    x = placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')
    gpu_options = GPUOptions(per_process_gpu_memory_fraction=0.8)
    config = ConfigProto(gpu_options=gpu_options)

    with Session(config=config) as sess:
        for i in range(ITERATIONS):
            imgs, labels =
            # Testing correctness of labels
            for label in labels:
                ## labels need to be integers
                assert(np.equal(np.mod(label, 1), 0).all())
                ## labels need to be in range pipe_name[2]
                assert((label >= pipe_name[2][0]).all())
                assert((label <= pipe_name[2][1]).all())
    print("OK : " + pipe_name[0].__name__)
RUN: MXNetReaderPipeline
OK : MXNetReaderPipeline
RUN: CaffeReadPipeline
OK : CaffeReadPipeline
RUN: FileReadPipeline
OK : FileReadPipeline
RUN: TFRecordPipeline
OK : TFRecordPipeline