Using Tensorflow DALI plugin: DALI and tf.data¶
Overview¶
DALI offers integration with tf.data API. Using this approach you can easily connect DALI pipeline with various TensorFlow APIs and use it as a data source for your model. This tutorial shows how to do it using well known MNIST converted to LMDB format. You can find it in DALI_extra - DALI test data repository.
We start with creating a DALI pipeline to read, decode and normalize MNIST images and read corresponding labels.
DALI_EXTRA_PATH
environment variable should point to the place where data from DALI extra repository is downloaded. Please make sure that the proper release tag is checked out.
[1]:
import nvidia.dali as dali
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
import os
# Path to MNIST dataset
data_path = os.path.join(os.environ['DALI_EXTRA_PATH'], 'db/MNIST/training/')
class MnistPipeline(Pipeline):
def __init__(self, batch_size, device, device_id=0, num_threads=4, seed=0):
super(MnistPipeline, self).__init__(
batch_size, num_threads, device_id, seed)
self.device = device
self.reader = ops.Caffe2Reader(path=data_path, random_shuffle=True)
self.decode = ops.ImageDecoder(
device='mixed' if device == 'gpu' else 'cpu',
output_type=types.GRAY)
self.cmn = ops.CropMirrorNormalize(
device=device,
dtype=types.FLOAT,
std=[255.],
output_layout="CHW")
def define_graph(self):
inputs, labels = self.reader(name="Reader")
images = self.decode(inputs)
if self.device == 'gpu':
labels = labels.gpu()
images = self.cmn(images)
return (images, labels)
Now we define some parameters of the training:
[2]:
BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS_PER_EPOCH = 100
Next step is to wrap an instance of MnistPipeline
with a DALIDataset
object from DALI TensorFlow plugin. This class is compatible with tf.data.Dataset
. Other parameters are shapes and types of the outputs of the pipeline. Here we return images and labels. It means we have two outputs one of type tf.float32
for images and on of type tf.int32
for labels.
[3]:
import nvidia.dali.plugin.tf as dali_tf
import tensorflow.compat.v1 as tf
tf.logging.set_verbosity(tf.logging.ERROR)
tf.disable_eager_execution()
# Create pipeline
mnist_pipeline = MnistPipeline(BATCH_SIZE, device='cpu', device_id=0)
# Define shapes and types of the outputs
shapes = (
(BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE),
(BATCH_SIZE))
dtypes = (
tf.float32,
tf.int32)
# Create dataset
mnist_set = dali_tf.DALIDataset(
pipeline=mnist_pipeline,
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0)
We are ready to start the training. Following sections show how to do it with different APIs availible in TensorFlow.
Keras¶
First, we pass mnist_set
to model created with tf.keras
and use model.fit
method to train it.
[4]:
# Create the model
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),
tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),
tf.keras.layers.Dropout(DROPOUT),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# Train using DALI dataset
model.fit(
mnist_set,
epochs=EPOCHS,
steps_per_epoch=ITERATIONS_PER_EPOCH)
Train on 100 steps
Epoch 1/5
100/100 [==============================] - 0s 2ms/step - loss: 0.9089 - accuracy: 0.7437
Epoch 2/5
100/100 [==============================] - 0s 2ms/step - loss: 0.4085 - accuracy: 0.8841
Epoch 3/5
100/100 [==============================] - 0s 2ms/step - loss: 0.3223 - accuracy: 0.9078
Epoch 4/5
100/100 [==============================] - 0s 2ms/step - loss: 0.2839 - accuracy: 0.9180
Epoch 5/5
100/100 [==============================] - 0s 2ms/step - loss: 0.2543 - accuracy: 0.9284
[4]:
<tensorflow.python.keras.callbacks.History at 0x7fe5a82e60d0>
As you can see, it was very easy to integrate DALI pipeline with tf.keras
API.
The code above performed the training using the CPU. Both the DALI pipeline and the model were using the CPU.
We can easily move the whole processing to the GPU. First, we create a pipeline that uses the GPU with ID = 0. Next we place both the DALI dataset and the model on the same GPU.
[5]:
# Create pipeline
mnist_pipeline = MnistPipeline(BATCH_SIZE, device='gpu', device_id=0)
# Define the model and place it on the GPU
with tf.device('/gpu:0'):
mnist_set = dali_tf.DALIDataset(
pipeline=mnist_pipeline,
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0)
model = tf.keras.models.Sequential([
tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),
tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),
tf.keras.layers.Dropout(DROPOUT),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')])
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
We move the training to the GPU as well. This allows TensorFlow to pick up GPU instance of DALI dataset.
[6]:
# Train on the GPU
with tf.device('/gpu:0'):
model.fit(
mnist_set,
epochs=EPOCHS,
steps_per_epoch=ITERATIONS_PER_EPOCH)
Train on 100 steps
Epoch 1/5
100/100 [==============================] - 0s 3ms/step - loss: 0.8987 - accuracy: 0.7414
Epoch 2/5
100/100 [==============================] - 0s 3ms/step - loss: 0.4121 - accuracy: 0.8775
Epoch 3/5
100/100 [==============================] - 0s 3ms/step - loss: 0.3193 - accuracy: 0.9028
Epoch 4/5
100/100 [==============================] - 0s 3ms/step - loss: 0.2877 - accuracy: 0.9183
Epoch 5/5
100/100 [==============================] - 0s 3ms/step - loss: 0.2611 - accuracy: 0.9237
It is important to note here, that there is no intermediate CPU buffer between DALI and TensorFlow in the execution above. DALI GPU outputs are copied straight to TF GPU Tensors used by the model.
In this particular toy example performance of the GPU variant is lower than the CPU one. The MNIST images are small and nvJPEG decoder used in the GPU DALI pipeline to decode them is not well suited for such circumstance. We use it here to show how to integrate it properly in the real life case.
Estimators¶
Another popular TensorFlow API is tf.estimator
API. This section shows how to use DALI dataset as a data source for model based on this API.
First we create the model.
[7]:
# Define the feature columns for Estimator
feature_columns = [tf.feature_column.numeric_column(
"images", shape=[IMAGE_SIZE, IMAGE_SIZE])]
# And the run config
run_config = tf.estimator.RunConfig(
model_dir='/tmp/tensorflow-checkpoints',
device_fn=lambda op: '/gpu:0')
# Finally create the model based on `DNNClassifier`
model = tf.estimator.DNNClassifier(
feature_columns=feature_columns,
hidden_units=[HIDDEN_SIZE],
n_classes=NUM_CLASSES,
dropout=DROPOUT,
config=run_config,
optimizer='Adam')
In tf.estimator
API data is passed to the model with the function returning the dataset. We define this function to return DALI dataset placed on the GPU.
[8]:
def train_data_fn():
with tf.device('/gpu:0'):
mnist_pipeline = MnistPipeline(BATCH_SIZE, device='gpu', device_id=0)
mnist_set = dali_tf.DALIDataset(
fail_on_device_mismatch=False,
pipeline=mnist_pipeline,
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0)
mnist_set = mnist_set.map(
lambda features, labels: ({'images': features}, labels))
return mnist_set
With everything set up we are ready to run the training.
[9]:
# Running the training on the GPU
model.train(input_fn=train_data_fn, steps=EPOCHS * ITERATIONS_PER_EPOCH)
[9]:
<tensorflow_estimator.python.estimator.canned.dnn.DNNClassifier at 0x7fe5a81950d0>
[10]:
def test_data_fn():
with tf.device('/cpu:0'):
mnist_pipeline = MnistPipeline(BATCH_SIZE, device='cpu', device_id=0)
mnist_set = dali_tf.DALIDataset(
fail_on_device_mismatch=False,
pipeline=mnist_pipeline,
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0)
mnist_set = mnist_set.map(
lambda features, labels: ({'images': features}, labels))
return mnist_set
model.evaluate(input_fn=test_data_fn, steps=ITERATIONS_PER_EPOCH)
[10]:
{'accuracy': 0.90046877,
'average_loss': 0.46437886,
'loss': 29.720247,
'global_step': 2500}
Custom models and training loops¶
Finally, the last part of this tutorial focuses on integrating DALI dataset with custom models and training loops. A complete example below shows from start to finish how to use DALI dataset with native TensorFlow model and run training using tf.Session
.
First step is to define the model and the dataset and place both on the GPU.
[11]:
tf.reset_default_graph()
options = tf.data.Options()
options.experimental_optimization.apply_default_optimizations = False
options.experimental_optimization.autotune = False
with tf.device('/gpu:0'):
mnist_set = dali_tf.DALIDataset(
pipeline=MnistPipeline(BATCH_SIZE, device='gpu', device_id=0),
batch_size=BATCH_SIZE,
output_shapes=shapes,
output_dtypes=dtypes,
device_id=0).with_options(options)
iterator = tf.data.make_initializable_iterator(mnist_set)
images, labels = iterator.get_next()
labels = tf.reshape(
tf.one_hot(labels, NUM_CLASSES),
[BATCH_SIZE, NUM_CLASSES])
with tf.variable_scope('mnist_net', reuse=False):
images = tf.layers.flatten(images)
images = tf.layers.dense(images, HIDDEN_SIZE, activation=tf.nn.relu)
images = tf.layers.dropout(images, rate=DROPOUT, training=True)
images = tf.layers.dense(images, NUM_CLASSES, activation=tf.nn.softmax)
logits_train = images
loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(
logits=logits_train, labels=labels))
train_step = tf.train.AdamOptimizer().minimize(loss_op)
correct_pred = tf.equal(
tf.argmax(logits_train, 1), tf.argmax(labels, 1))
accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
With tf.Session
we can run this model and train it on the GPU.
[12]:
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
sess.run(iterator.initializer)
for i in range(EPOCHS * ITERATIONS_PER_EPOCH):
sess.run(train_step)
if i % ITERATIONS_PER_EPOCH == 0:
train_accuracy = sess.run(accuracy)
print("Step %d, accuracy: %g" % (i, train_accuracy))
final_accuracy = 0
for _ in range(ITERATIONS_PER_EPOCH):
final_accuracy = final_accuracy + sess.run(accuracy)
final_accuracy = final_accuracy / ITERATIONS_PER_EPOCH
print('Final accuracy: ', final_accuracy)
Step 0, accuracy: 0.15625
Step 100, accuracy: 0.90625
Step 200, accuracy: 0.84375
Step 300, accuracy: 0.953125
Step 400, accuracy: 0.90625
Final accuracy: 0.91171875