ExternalSource operator

In this example, we will see how to use ExternalSource operator which allows us to use an external data source as an input to the Pipeline.

[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types

batch_size = 16

Defining the data source

In this example, we use an infinite iterator as a data source.

[2]:
class ExternalInputIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        shuffle(self.files)

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            f = open(self.images_dir + jpeg_filename, 'rb')
            batch.append(np.frombuffer(f.read(), dtype = np.uint8))
            labels.append(np.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

Defining the pipeline

The next step is to define the Pipeline.

The ExternalSource operator accepts an iterable or a callable. If the source provides multiple outputs (e.g. images and labels), that number must also be specified as num_outputs argument.

Internally, the pipeline will call source (if callable) or run next(source) (if iterable) whenever more data is needed to keep the pipeline running.

[3]:
eii = ExternalInputIterator(batch_size)
[4]:
 class ExternalSourcePipeline(Pipeline):
    def __init__(self, batch_size, eii, num_threads, device_id):
        super(ExternalSourcePipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)

        self.source = ops.ExternalSource(source = eii, num_outputs = 2)
        self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
        self.enhance = ops.BrightnessContrast(device = "gpu", contrast = 2)

    def define_graph(self):
        jpegs, labels = self.source()
        images = self.decode(jpegs)
        output = self.enhance(images)
        return (output, labels)

Using the pipeline

[5]:
pipe = ExternalSourcePipeline(batch_size=batch_size, eii=eii, num_threads=2, device_id = 0)
pipe.build()
pipe_out = pipe.run()

Notice that labels are still on CPU and no as_cpu call is needed to show them.

[6]:
batch_cpu = pipe_out[0].as_cpu()
labels_cpu = pipe_out[1]
[7]:
import matplotlib.pyplot as plt
img = batch_cpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis('off')
plt.imshow(img)
(669, 640, 3)
[1]
[7]:
<matplotlib.image.AxesImage at 0x7f64ac35c860>
../../../_images/examples_general_data_loading_external_input_11_2.png

Interacting with the GPU input

The external source operator can also accept GPU data from CuPy or any other data source that supports cuda array interface. For the sake of this example, we will create an ExternalInputGpuIterator in such a way that it returns data on the GPU already. As ImageDecoder doesn’t accept data on the GPU we need to decode it outside of DALI on the CPU and then move it to the GPU. In normal cases, the image, or other data would be on the GPU already as a result of the operation of another library.

[8]:
import cupy as cp
import imageio

class ExternalInputGpuIterator(object):
    def __init__(self, batch_size):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", 'r') as f:
            self.files = [line.rstrip() for line in f if line is not '']
        shuffle(self.files)

    def __iter__(self):
        self.i = 0
        self.n = len(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []
        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i].split(' ')
            im = imageio.imread(self.images_dir + jpeg_filename)
            im = cp.asarray(im)
            im = im * 0.6;
            batch.append(im.astype(cp.uint8))
            labels.append(cp.array([label], dtype = np.uint8))
            self.i = (self.i + 1) % self.n
        return (batch, labels)

Let us modify the previous pipeline use the GPU version of ExternalSource operator, removing the decoding as we assume that the raw image is already on the GPU

[9]:
eii_gpu = ExternalInputGpuIterator(batch_size)

print(type(next(iter(eii_gpu))[0][0]))
<class 'cupy.core.core.ndarray'>
[10]:
class ExternalSourceGpuPipeline(Pipeline):
    def __init__(self, batch_size, eii, num_threads, device_id):
        super(ExternalSourceGpuPipeline, self).__init__(batch_size,
                                      num_threads,
                                      device_id,
                                      seed=12)

        self.source = ops.ExternalSource(device = "gpu", source = eii, num_outputs = 2)
        self.enhance = ops.BrightnessContrast(device = "gpu", contrast = 2)

    def define_graph(self):
        images, labels = self.source()
        output = self.enhance(images)
        return (output, labels)

pipe_gpu = ExternalSourceGpuPipeline(batch_size=batch_size, eii=eii_gpu, num_threads=2, device_id = 0)
pipe_gpu.build()

Now let us visualize the results:

[11]:
pipe_out_gpu = pipe_gpu.run()
batch_gpu = pipe_out_gpu[0].as_cpu()
labels_gpu = pipe_out_gpu[1].as_cpu()

img = batch_gpu.at(2)
print(img.shape)
print(labels_cpu.at(2))
plt.axis('off')
plt.imshow(img)
(425, 640, 3)
[1]
[11]:
<matplotlib.image.AxesImage at 0x7f64ac3715c0>
../../../_images/examples_general_data_loading_external_input_18_2.png