ExternalSource operator

In this example, we will see how to use ExternalSource operator, that allows us to use an external data source as an input to the Pipeline.

In order to achieve that, we have to define a Iterator or Generator class which next function will return one or several numpy arrays.

import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

batch_size = 16

Defining the iterator

class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            f = open(self.images_dir + jpeg_filename, 'rb')
            batch.append(np.frombuffer(f.read(), dtype = np.uint8))
            labels.append(np.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

    next = __next__

Defining the pipeline

The next step is to define the Pipeline.

We override Pipeline.iter_setup, a method called by the pipeline before every Pipeline.run, to call the iterator and feed the result to ExternalSource() operator, referenced by self.jpeg, by using feed_input. DALI makes sure that the iter_setup is called an appropriate number of times ahead when DALI does prefetching. Calling feed_input outside iter_setup will lead to undefined results.

eii = ExternalInputIterator(batch_size)
iterator = iter(eii)
 class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ExternalSourcePipeline, self).__init__(batch_size,
        self.input = ops.ExternalSource()
        self.input_label = ops.ExternalSource()
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
        self.cast = ops.Cast(device = "gpu",
                             dtype = types.INT32)

    def define_graph(self):
        self.jpegs = self.input()
        self.labels = self.input_label()
        images = self.decode(self.jpegs)
        output = self.cast(images)
        return (output, self.labels)

    def iter_setup(self):
        (images, labels) = iterator.next()
        self.feed_input(self.jpegs, images, layout="HWC")
        self.feed_input(self.labels, labels)

Using the pipeline

pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0)
pipe_out = pipe.run()

Notice that labels are still on CPU and no as_cpu call is needed to show them.

batch_cpu = pipe_out[0].as_cpu()
labels_cpu = pipe_out[1]
from __future__ import print_function
import matplotlib.pyplot as plt
img = batch_cpu.at(2)
(427, 640, 3)
<matplotlib.image.AxesImage at 0x7fbc1130a2d0>