MXNet with DALI - ResNet 50 example

Overview

This example shows, how to use DALI pipelines with Apache MXNet.

ResNet 50 pipeline

Let us first define a few global constants.

In [1]:
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

N = 8  # number of GPUs
batch_size = 128  # batch size per GPU

db_folder = "/data/imagenet/train-480-val-256-recordio/"

The training pipeline

The training pipeline consists of the following steps: * Data is first read from MXNet’s recordIO file (the reader op is given a name Reader for later use) * Then, images are decoded using nvJPEG * RGB images are then randomly cropped and resized to the final size of (224, 224) pixels * Finally, the batch is transposed from NHWC layout to NCHW layout, normalized and randomly mirrored.

DALIClassificationIterator, which we will use for interfacing with MXNet in this example, requires outputs of the pipeline to follow (image, label) structure.

In [2]:
class HybridTrainPipe(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed = 12 + device_id)
        self.input = ops.MXNetReader(path = [db_folder+"train.rec"], index_path=[db_folder+"train.idx"],
                                     random_shuffle = True, shard_id = device_id, num_shards = num_gpus)
        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.rrc = ops.RandomResizedCrop(device = "gpu", size = (224, 224))
        self.cmnp = ops.CropMirrorNormalize(device = "gpu",
                                            output_dtype = types.FLOAT,
                                            output_layout = types.NCHW,
                                            crop = (224, 224),
                                            image_type = types.RGB,
                                            mean = [0.485 * 255,0.456 * 255,0.406 * 255],
                                            std = [0.229 * 255,0.224 * 255,0.225 * 255])
        self.coin = ops.CoinFlip(probability = 0.5)

    def define_graph(self):
        rng = self.coin()
        self.jpegs, self.labels = self.input(name = "Reader")
        images = self.decode(self.jpegs)
        images = self.rrc(images)
        output = self.cmnp(images, mirror = rng)
        return [output, self.labels]

The validation pipeline

The validation pipeline is similar to the training pipeline, but omits the random resized crop and random mirroring steps, as well as shuffling the data coming from the reader.

In [3]:
class HybridValPipe(Pipeline):
    def __init__(self, batch_size, num_threads, device_id, num_gpus):
        super(HybridValPipe, self).__init__(batch_size, num_threads, device_id, seed = 12 + device_id)
        self.input = ops.MXNetReader(path = [db_folder+"val.rec"], index_path=[db_folder+"val.idx"],
                                     random_shuffle = False, shard_id = device_id, num_shards = num_gpus)
        self.decode = ops.nvJPEGDecoder(device = "mixed", output_type = types.RGB)
        self.cmnp = ops.CropMirrorNormalize(device = "gpu",
                                            output_dtype = types.FLOAT,
                                            output_layout = types.NCHW,
                                            crop = (224, 224),
                                            image_type = types.RGB,
                                            mean = [0.485 * 255,0.456 * 255,0.406 * 255],
                                            std = [0.229 * 255,0.224 * 255,0.225 * 255])

    def define_graph(self):
        self.jpegs, self.labels = self.input(name = "Reader")
        images = self.decode(self.jpegs)
        output = self.cmnp(images)
        return [output, self.labels]

In [4]:
trainpipes = [HybridTrainPipe(batch_size=batch_size, num_threads=2, device_id = i, num_gpus = N) for i in range(N)]
valpipes = [HybridValPipe(batch_size=batch_size, num_threads=2, device_id = i, num_gpus = N) for i in range(N)]

Using the MXNet plugin

MXNet data iterators need to know what is the size of the dataset. Since DALI pipelines may consist of multiple readers, potentially with differently sized datasets, we need to specify the reader which we ask for the epoch size. That is why we gave a name to readers in both training and validation pipelines.

In order to get the epoch size out of the reader, we need to build one of the training and one of the validation pipelines.

In [5]:
trainpipes[0].build()
valpipes[0].build()
In [6]:
print("Training pipeline epoch size: {}".format(trainpipes[0].epoch_size("Reader")))
print("Validation pipeline epoch size: {}".format(valpipes[0].epoch_size("Reader")))
Training pipeline epoch size: 1281167
Validation pipeline epoch size: 50000

Now we can make MXNet iterators out of our pipelines, using DALIClassificationIterator class.

In [7]:
from nvidia.dali.plugin.mxnet import DALIClassificationIterator
dali_train_iter = DALIClassificationIterator(trainpipes, trainpipes[0].epoch_size("Reader"))
dali_val_iter = DALIClassificationIterator(valpipes, valpipes[0].epoch_size("Reader"))

Training with MXNet

Once we have MXNet data iterators from DALIClassificationIterator, we can use them instead of MXNet’smx.io.ImageRecordIter. Here we show modified train_imagenet.py example that uses our DALI pipelines.

In [8]:
import os
import argparse
import logging
logging.basicConfig(level=logging.DEBUG)
from demo.common import find_mxnet, data, fit
import mxnet as mx

gpus_string = "".join(str(list(range(N)))).replace('[','').replace(']','')

s = ['--gpu', gpus_string,
     '--batch-size', str(batch_size * N),
     '--num-epochs', '1',
     '--data-train', '/data/imagenet/train-480-val-256-recordio/train.rec',
     '--data-val', '/data/imagenet/train-480-val-256-recordio/val.rec',
     '--disp-batches', '100',
     '--network', 'resnet-v1',
     '--num-layers', '50',
     '--data-nthreads', '40',
     '--min-random-scale', '0.533',
     '--max-random-shear-ratio', '0',
     '--max-random-rotate-angle', '0',
     '--max-random-h', '0',
     '--max-random-l', '0',
     '--max-random-s', '0',
     '--dtype', 'float16']

# parse args
parser = argparse.ArgumentParser(description="train imagenet-1k",
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
fit.add_fit_args(parser)
data.add_data_args(parser)
data.add_data_aug_args(parser)
# use a large aug level
data.set_data_aug_level(parser, 3)
parser.set_defaults(
        # network
        network          = 'resnet',
        num_layers       = 50,
        # data
        num_classes      = 1000,
        num_examples     = 1281167,
        image_shape      = '3,224,224',
        min_random_scale = 1, # if input image has min size k, suggest to use
                              # 256.0/x, e.g. 0.533 for 480
        # train
        num_epochs       = 80,
        lr_step_epochs   = '30,60',
        dtype            = 'float32'
    )
args = parser.parse_args(s)


# load network
from importlib import import_module
net = import_module('demo.symbols.'+args.network)
sym = net.get_symbol(1000, 50, "3,224,224", dtype='float16')

def get_dali_iter(args, kv=None):
    return (dali_train_iter, dali_val_iter)

# train
#fit.fit(args, sym, data.get_rec_iter)
fit.fit(args, sym, get_dali_iter)
INFO:root:start with arguments Namespace(batch_size=1024, benchmark=0, data_nthreads=40, data_train='/data/imagenet/train-480-val-256-recordio/train.rec', data_train_idx='', data_val='/data/imagenet/train-480-val-256-recordio/val.rec', data_val_idx='', disp_batches=100, dtype='float16', gc_threshold=0.5, gc_type='none', gpus='0, 1, 2, 3, 4, 5, 6, 7', image_shape='3,224,224', initializer='default', kv_store='device', load_epoch=None, loss='', lr=0.1, lr_factor=0.1, lr_step_epochs='30,60', macrobatch_size=0, max_random_aspect_ratio=0.25, max_random_h=0, max_random_l=0, max_random_rotate_angle=0, max_random_s=0, max_random_scale=1, max_random_shear_ratio=0.0, min_random_scale=0.533, model_prefix=None, mom=0.9, monitor=0, network='resnet-v1', num_classes=1000, num_epochs=1, num_examples=1281167, num_layers=50, optimizer='sgd', pad_size=0, random_crop=1, random_mirror=1, rgb_mean='123.68,116.779,103.939', test_io=0, top_k=0, warmup_epochs=5, warmup_strategy='linear', wd=0.0001)
INFO:root:Epoch[0] Batch [100]  Speed: 5381.66 samples/sec      accuracy=0.001257
INFO:root:Epoch[0] Batch [200]  Speed: 6635.37 samples/sec      accuracy=0.001445
INFO:root:Epoch[0] Batch [300]  Speed: 6744.02 samples/sec      accuracy=0.003154
INFO:root:Epoch[0] Batch [400]  Speed: 6767.70 samples/sec      accuracy=0.006914
INFO:root:Epoch[0] Batch [500]  Speed: 6787.36 samples/sec      accuracy=0.010078
INFO:root:Epoch[0] Batch [600]  Speed: 6796.49 samples/sec      accuracy=0.016934
INFO:root:Epoch[0] Batch [700]  Speed: 6816.37 samples/sec      accuracy=0.023477
INFO:root:Epoch[0] Batch [800]  Speed: 6801.38 samples/sec      accuracy=0.032256
INFO:root:Epoch[0] Batch [900]  Speed: 6826.82 samples/sec      accuracy=0.042510
INFO:root:Epoch[0] Batch [1000] Speed: 6812.82 samples/sec      accuracy=0.052744
INFO:root:Epoch[0] Batch [1100] Speed: 6817.72 samples/sec      accuracy=0.063359
INFO:root:Epoch[0] Batch [1200] Speed: 6828.04 samples/sec      accuracy=0.069570
INFO:root:Epoch[0] Train-accuracy=0.081150
INFO:root:Epoch[0] Time cost=202.865
WARNING:root:DALI iterator does not support resetting while epoch is not finished. Ignoring...
INFO:root:Epoch[0] Validation-accuracy=0.092355