ExternalSource operator#

In this example, we will see how to use ExternalSource operator with PyTorch DALI iterator, that allows us to use an external data source as an input to the Pipeline.

In order to achieve that, we have to define a Iterator or Generator class which next function will return one or several numpy arrays.

[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.types as types
import nvidia.dali.fn as fn
import torch

batch_size = 3
epochs = 3

Defining the Iterator#

[2]:
class ExternalInputIterator(object):
    def __init__(self, batch_size, device_id, num_gpus):
        self.images_dir = "../../data/images/"
        self.batch_size = batch_size
        with open(self.images_dir + "file_list.txt", "r") as f:
            self.files = [line.rstrip() for line in f if line is not ""]
        # whole data set size
        self.data_set_len = len(self.files)
        # based on the device_id and total number of GPUs - world size
        # get proper shard
        self.files = self.files[
            self.data_set_len
            * device_id
            // num_gpus : self.data_set_len
            * (device_id + 1)
            // num_gpus
        ]
        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        for _ in range(self.batch_size):
            jpeg_filename, label = self.files[self.i % self.n].split(" ")
            batch.append(
                np.fromfile(self.images_dir + jpeg_filename, dtype=np.uint8)
            )  # we can use numpy
            labels.append(
                torch.tensor([int(label)], dtype=torch.uint8)
            )  # or PyTorch's native tensors
            self.i += 1
        return (batch, labels)

    def __len__(self):
        return self.data_set_len

    next = __next__

Defining the Pipeline#

Now let’s define our pipeline. We need an instance of Pipeline class and some operators which will define the processing graph. Our external source provides 2 outpus which we can conveniently unpack by specifying num_outputs=2 in the external source operator.

[3]:
def ExternalSourcePipeline(batch_size, num_threads, device_id, external_data):
    pipe = Pipeline(batch_size, num_threads, device_id)
    with pipe:
        jpegs, labels = fn.external_source(
            source=external_data, num_outputs=2, dtype=types.UINT8
        )
        images = fn.decoders.image(jpegs, device="mixed")
        images = fn.resize(images, resize_x=240, resize_y=240)
        output = fn.cast(images, dtype=types.UINT8)
        pipe.set_outputs(output, labels)
    return pipe

Using the Pipeline#

In the end, let us see how it works.

last_batch_padded and last_batch_policy are set here only for the demonstration purposes. The user may write any custom code and change the epoch size epoch to epoch. In that case, it is recommended to set size to -1 and let the iterator just wait for StopIteration exception from the iter_setup.

The last_batch_padded here tells the iterator that the difference between data set size and batch size alignment is padded by real data that could be skipped when provided to the framework (last_batch_policy):

[4]:
from nvidia.dali.plugin.pytorch import (
    DALIClassificationIterator as PyTorchIterator,
)
from nvidia.dali.plugin.pytorch import LastBatchPolicy

eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(
    batch_size=batch_size, num_threads=2, device_id=0, external_data=eii
)
pii = PyTorchIterator(
    pipe, last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL
)

for e in range(epochs):
    for i, data in enumerate(pii):
        real_batch_size = len(data[0]["data"])
        print(f"epoch: {e}, iter {i}, real batch size: {real_batch_size}")
    pii.reset()
epoch: 0, iter 0, real batch size: 3
epoch: 0, iter 1, real batch size: 3
epoch: 0, iter 2, real batch size: 3
epoch: 0, iter 3, real batch size: 3
epoch: 0, iter 4, real batch size: 3
epoch: 0, iter 5, real batch size: 3
epoch: 0, iter 6, real batch size: 3
epoch: 1, iter 0, real batch size: 3
epoch: 1, iter 1, real batch size: 3
epoch: 1, iter 2, real batch size: 3
epoch: 1, iter 3, real batch size: 3
epoch: 1, iter 4, real batch size: 3
epoch: 1, iter 5, real batch size: 3
epoch: 1, iter 6, real batch size: 3
epoch: 2, iter 0, real batch size: 3
epoch: 2, iter 1, real batch size: 3
epoch: 2, iter 2, real batch size: 3
epoch: 2, iter 3, real batch size: 3
epoch: 2, iter 4, real batch size: 3
epoch: 2, iter 5, real batch size: 3
epoch: 2, iter 6, real batch size: 3