Using Tensorflow DALI plugin: using various readers¶
Overview¶
This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.
Following readers are used in this example:
- MXNetReader
- FileReader
- TFRecordReader
For details on how to use them please see other examples.
Lets start from defining some global constants
In [1]:
# MXNet RecordIO
db_folder = "/data/imagenet/train-480-val-256-recordio/"
# image dir with plain jpeg files
image_dir = "../images"
# TFRecord
tfrecord = "/data/imagenet/train-val-tfrecord-480/train-00001-of-01024"
tfrecord_idx = "idx_files/train-00001-of-01024.idx"
tfrecord2idx_script = "tfrecord2idx"
N = 4 # number of GPUs
BATCH_SIZE = 128 # batch size per GPU
ITERATIONS = 32
IMAGE_SIZE = 3
Create idx file by calling tfrecord2idx
script
In [2]:
from subprocess import call
import os.path
if not os.path.exists("idx_files"):
os.mkdir("idx_files")
if not os.path.isfile(tfrecord_idx):
call([tfrecord2idx_script, tfrecord, tfrecord_idx])
Let us define: - common part of pipeline, other pipelines will inherit it
In [3]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
class CommonPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(CommonPipeline, self).__init__(batch_size, num_threads, device_id)
self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
self.resize = ops.Resize(device = "gpu",
image_type = types.RGB,
interp_type = types.INTERP_LINEAR)
self.cmn = ops.CropMirrorNormalize(device = "gpu",
output_dtype = types.FLOAT,
crop = (227, 227),
image_type = types.RGB,
mean = [128., 128., 128.],
std = [1., 1., 1.])
self.uniform = ops.Uniform(range = (0.0, 1.0))
self.resize_rng = ops.Uniform(range = (256, 480))
def base_define_graph(self, inputs, labels):
images = self.decode(inputs)
images = self.resize(images, resize_shorter = self.resize_rng())
output = self.cmn(images, crop_pos_x = self.uniform(),
crop_pos_y = self.uniform())
return (output, labels.gpu())
- MXNetReaderPipeline
In [4]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
class MXNetReaderPipeline(CommonPipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus):
super(MXNetReaderPipeline, self).__init__(batch_size, num_threads, device_id)
self.input = ops.MXNetReader(path = [db_folder+"train.rec"], index_path=[db_folder+"train.idx"],
random_shuffle = True, shard_id = device_id, num_shards = num_gpus)
def define_graph(self):
images, labels = self.input(name="Reader")
return self.base_define_graph(images, labels)
- FileReadPipeline
In [5]:
class FileReadPipeline(CommonPipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus):
super(FileReadPipeline, self).__init__(batch_size, num_threads, device_id)
self.input = ops.FileReader(file_root = image_dir)
def define_graph(self):
images, labels = self.input()
return self.base_define_graph(images, labels)
- TFRecordPipeline
In [6]:
import nvidia.dali.tfrecord as tfrec
class TFRecordPipeline(CommonPipeline):
def __init__(self, batch_size, num_threads, device_id, num_gpus):
super(TFRecordPipeline, self).__init__(batch_size, num_threads, device_id)
self.input = ops.TFRecordReader(path = tfrecord,
index_path = tfrecord_idx,
features = {"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
"image/class/label": tfrec.FixedLenFeature([1], tfrec.int64, -1)
})
def define_graph(self):
inputs = self.input()
images = inputs["image/encoded"]
labels = inputs["image/class/label"]
return self.base_define_graph(images, labels)
Now let us create function which build serialized pipeline on demand:
In [7]:
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
def get_batch_test_dali(batch_size, pipe_type):
pipe_name, label_type, _ = pipe_type
pipes = [pipe_name(batch_size=batch_size, num_threads=2, device_id = device_id, num_gpus = N) for device_id in range(N)]
serialized_pipes = [pipe.serialize() for pipe in pipes]
del pipes
daliop = dali_tf.DALIIterator()
images = []
labels = []
for d in range(N):
with tf.device('/gpu:%i' % d):
image, label = daliop(serialized_pipeline = serialized_pipes[d],
shape = [BATCH_SIZE, 3, 227, 227],
image_type = tf.int32,
label_type = label_type,
device_id = d)
images.append(image)
labels.append(label)
return [images, labels]
At the end let us test if all pipelines could be corectly build, serialized and run with TF session
In [8]:
import numpy as np
pipe_types = [[MXNetReaderPipeline, tf.float32, (0, 999)],
[FileReadPipeline, tf.int32, (0, 1)],
[TFRecordPipeline, tf.int64, (1, 1000)]]
for pipe_name in pipe_types:
print ("RUN: " + pipe_name[0].__name__)
test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
x = tf.placeholder(tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name='x')
gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.8)
config = tf.ConfigProto(gpu_options=gpu_options)
with tf.Session(config=config) as sess:
for i in range(ITERATIONS):
imgs, labels = sess.run(test_batch)
# Testing correctness of labels
for label in labels:
## labels need to be integers
assert(np.equal(np.mod(label, 1), 0).all())
## labels need to be in range pipe_name[2]
assert((label >= pipe_name[2][0]).all())
assert((label <= pipe_name[2][1]).all())
print("OK : " + pipe_name[0].__name__)
RUN: MXNetReaderPipeline
OK : MXNetReaderPipeline
RUN: FileReadPipeline
OK : FileReadPipeline
RUN: TFRecordPipeline
OK : TFRecordPipeline