Using Tensorflow DALI plugin: using various readers#
Overview#
This example shows how different readers could be used to interact with Tensorflow. It shows how flexible DALI is.
The following readers are used in this example:
readers.mxnet
readers.caffe
readers.file
readers.tfrecord
For details on how to use them please see other examples.
Let us start with defining some global constants
DALI_EXTRA_PATH
environment variable should point to the place where data from DALI extra repository is downloaded. Please make sure that the proper release tag is checked out.
[1]:
import os.path
test_data_root = os.environ["DALI_EXTRA_PATH"]
# MXNet RecordIO
db_folder = os.path.join(test_data_root, "db", "recordio/")
# Caffe LMDB
lmdb_folder = os.path.join(test_data_root, "db", "lmdb")
# image dir with plain jpeg files
image_dir = "../../data/images"
# TFRecord
tfrecord = os.path.join(test_data_root, "db", "tfrecord", "train")
tfrecord_idx = "idx_files/train.idx"
tfrecord2idx_script = "tfrecord2idx"
N = 8 # number of GPUs
BATCH_SIZE = 128 # batch size per GPU
ITERATIONS = 32
IMAGE_SIZE = 3
Create idx file by calling tfrecord2idx
script
[2]:
from subprocess import call
import os.path
if not os.path.exists("idx_files"):
os.mkdir("idx_files")
if not os.path.isfile(tfrecord_idx):
call([tfrecord2idx_script, tfrecord, tfrecord_idx])
Let us define: - common part of the processing graph, used by all pipelines
[3]:
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.fn as fn
import nvidia.dali.types as types
def common_pipeline(jpegs, labels):
images = fn.decoders.image(jpegs, device="mixed")
images = fn.resize(
images,
resize_shorter=fn.random.uniform(range=(256, 480)),
interp_type=types.INTERP_LINEAR,
)
images = fn.crop_mirror_normalize(
images,
crop_pos_x=fn.random.uniform(range=(0.0, 1.0)),
crop_pos_y=fn.random.uniform(range=(0.0, 1.0)),
dtype=types.FLOAT,
crop=(227, 227),
mean=[128.0, 128.0, 128.0],
std=[1.0, 1.0, 1.0],
)
return images, labels
MXNet reader pipeline
[4]:
@pipeline_def
def mxnet_reader_pipeline(num_gpus):
jpegs, labels = fn.readers.mxnet(
path=[db_folder + "train.rec"],
index_path=[db_folder + "train.idx"],
random_shuffle=True,
shard_id=Pipeline.current().device_id,
num_shards=num_gpus,
name="Reader",
)
return common_pipeline(jpegs, labels)
Caffe reader pipeline
[5]:
@pipeline_def
def caffe_reader_pipeline(num_gpus):
jpegs, labels = fn.readers.caffe(
path=lmdb_folder,
random_shuffle=True,
shard_id=Pipeline.current().device_id,
num_shards=num_gpus,
name="Reader",
)
return common_pipeline(jpegs, labels)
File reader pipeline
[6]:
@pipeline_def
def file_reader_pipeline(num_gpus):
jpegs, labels = fn.readers.file(
file_root=image_dir,
random_shuffle=True,
shard_id=Pipeline.current().device_id,
num_shards=num_gpus,
name="Reader",
)
return common_pipeline(jpegs, labels)
TFRecord reader pipeline
[7]:
import nvidia.dali.tfrecord as tfrec
@pipeline_def
def tfrecord_reader_pipeline(num_gpus):
inputs = fn.readers.tfrecord(
path=tfrecord,
index_path=tfrecord_idx,
features={
"image/encoded": tfrec.FixedLenFeature((), tfrec.string, ""),
"image/class/label": tfrec.FixedLenFeature([1], tfrec.int64, -1),
},
random_shuffle=True,
shard_id=Pipeline.current().device_id,
num_shards=num_gpus,
name="Reader",
)
return common_pipeline(inputs["image/encoded"], inputs["image/class/label"])
Now let us create function which builds pipeline on demand:
[8]:
import tensorflow as tf
import nvidia.dali.plugin.tf as dali_tf
from tensorflow.compat.v1 import GPUOptions
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import Session
from tensorflow.compat.v1 import placeholder
tf.compat.v1.disable_eager_execution()
def get_batch_test_dali(batch_size, pipe_type):
pipe_name, label_type, _ = pipe_type
pipes = [
pipe_name(
batch_size=BATCH_SIZE,
num_threads=2,
device_id=device_id,
num_gpus=N,
)
for device_id in range(N)
]
daliop = dali_tf.DALIIterator()
images = []
labels = []
for d in range(N):
with tf.device("/gpu:%i" % d):
image, label = daliop(
pipeline=pipes[d],
shapes=[(BATCH_SIZE, 3, 227, 227), ()],
dtypes=[tf.int32, label_type],
device_id=d,
)
images.append(image)
labels.append(label)
return [images, labels]
At the end let us test if all pipelines have been correctly built and run with TF session
[9]:
import numpy as np
pipe_types = [
[mxnet_reader_pipeline, tf.float32, (0, 999)],
[caffe_reader_pipeline, tf.int32, (0, 999)],
[file_reader_pipeline, tf.int32, (0, 1)],
[tfrecord_reader_pipeline, tf.int64, (1, 1000)],
]
for pipe_name in pipe_types:
print("RUN: " + pipe_name[0].__name__)
test_batch = get_batch_test_dali(BATCH_SIZE, pipe_name)
x = placeholder(
tf.float32, shape=[BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, 3], name="x"
)
gpu_options = GPUOptions(per_process_gpu_memory_fraction=0.8)
config = ConfigProto(gpu_options=gpu_options)
with Session(config=config) as sess:
for i in range(ITERATIONS):
imgs, labels = sess.run(test_batch)
# Testing correctness of labels
for label in labels:
## labels need to be integers
assert np.equal(np.mod(label, 1), 0).all()
## labels need to be in range pipe_name[2]
assert (label >= pipe_name[2][0]).all()
assert (label <= pipe_name[2][1]).all()
print("OK : " + pipe_name[0].__name__)
RUN: mxnet_reader_pipeline
OK : mxnet_reader_pipeline
RUN: caffe_reader_pipeline
OK : caffe_reader_pipeline
RUN: file_reader_pipeline
OK : file_reader_pipeline
RUN: tfrecord_reader_pipeline
OK : tfrecord_reader_pipeline
[ ]: