Using Tensorflow DALI plugin: DALI tf.data.Dataset with multiple GPUs

Overview

This notebook is a comprehensive example on how to use DALI tf.data.Dataset with multiple GPUs. It is recommended to look into single GPU example first to get up to speed with DALI dataset and how it can be used to train a neural network. This example is an extension of the single GPU version.

Initially we define some parameters of the training and to create a DALI pipeline to read MNIST converted to LMDB format. You can find it in DALI_extra dataset. This pipeline is able to partition the dataset into multiple shards.

DALI_EXTRA_PATH environment variable should point to the place where data from DALI extra repository is downloaded. Please make sure that the proper release tag is checked out.

[1]:
import nvidia.dali as dali
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

import os

import nvidia.dali.plugin.tf as dali_tf
import tensorflow as tf

import logging
tf.get_logger().setLevel(logging.ERROR)
[2]:
# Path to MNIST dataset
data_path = os.path.join(os.environ['DALI_EXTRA_PATH'], 'db/MNIST/training/')

BATCH_SIZE = 64
DROPOUT = 0.2
IMAGE_SIZE = 28
NUM_CLASSES = 10
HIDDEN_SIZE = 128
EPOCHS = 5
ITERATIONS = 100
NUM_DEVICES = 2
[3]:
class MnistPipeline(Pipeline):
    def __init__(
        self, batch_size, device_id=0, shard_id=0, num_shards=1, num_threads=4, seed=0):
        super(MnistPipeline, self).__init__(
            batch_size, num_threads, device_id, seed)
        self.reader = ops.Caffe2Reader(
            path=data_path, random_shuffle=True, shard_id=shard_id, num_shards=num_shards)
        self.decode = ops.ImageDecoder(
            device='mixed',
            output_type=types.GRAY)
        self.cmn = ops.CropMirrorNormalize(
            device='gpu',
            dtype=types.FLOAT,
            std=[255.],
            output_layout="CHW")

    def define_graph(self):
        inputs, labels = self.reader(name="Reader")
        images = self.decode(inputs)
        labels = labels.gpu()
        images = self.cmn(images)

        return (images, labels)

Next we create some parameters needed for the DALI dataset. For more details on what they are you can look into single GPU example.

[4]:
shapes = (
    (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE),
    (BATCH_SIZE))
dtypes = (
    tf.float32,
    tf.int32)

Now we are ready to define the model. To make the training distributed to multiple GPUs, we use tf.distribute.MirroredStrategy.

[5]:
strategy = tf.distribute.MirroredStrategy(devices=['/gpu:0', '/gpu:1'])

with strategy.scope():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE), name='images'),
        tf.keras.layers.Flatten(input_shape=(IMAGE_SIZE, IMAGE_SIZE)),
        tf.keras.layers.Dense(HIDDEN_SIZE, activation='relu'),
        tf.keras.layers.Dropout(DROPOUT),
        tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')])

    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy'])

DALI dataset needs to be distributed as well. To do it, we use distribute_datasets_from_function. First we need to define a function that returns dataset bound to a device given by id. Also, some specific options are needed to make everything work.

[6]:
 def dataset_fn(input_context):
        with tf.device("/gpu:{}".format(input_context.input_pipeline_id)):
            device_id = input_context.input_pipeline_id
            return dali_tf.DALIDataset(
                pipeline=MnistPipeline(
                    BATCH_SIZE, device_id=device_id, shard_id=device_id, num_shards=NUM_DEVICES),
                batch_size=BATCH_SIZE,
                output_shapes=shapes,
                output_dtypes=dtypes,
                device_id=device_id)

input_options = tf.distribute.InputOptions(
    experimental_place_dataset_on_device = True,
    experimental_prefetch_to_device = False,
    experimental_replication_mode = tf.distribute.InputReplicationMode.PER_REPLICA)

train_dataset = strategy.distribute_datasets_from_function(dataset_fn, input_options)

With everything in place, we are ready to run the training and evaluate the model.

[7]:
model.fit(
    train_dataset,
    epochs=EPOCHS,
    steps_per_epoch=ITERATIONS)
Epoch 1/5
100/100 [==============================] - 4s 4ms/step - loss: 1.2798 - accuracy: 0.6179
Epoch 2/5
100/100 [==============================] - 0s 4ms/step - loss: 0.4025 - accuracy: 0.8838
Epoch 3/5
100/100 [==============================] - 0s 4ms/step - loss: 0.3110 - accuracy: 0.9103
Epoch 4/5
100/100 [==============================] - 0s 3ms/step - loss: 0.2730 - accuracy: 0.9237
Epoch 5/5
100/100 [==============================] - 0s 4ms/step - loss: 0.2323 - accuracy: 0.9340
[7]:
<tensorflow.python.keras.callbacks.History at 0x7fd35ce97fd0>
[8]:
model.evaluate(
    train_dataset,
    steps=ITERATIONS)
100/100 [==============================] - 2s 3ms/step - loss: 0.1943 - accuracy: 0.9454
[8]:
[0.19432015717029572, 0.9453906416893005]