ExternalSource operator¶
In this example, we will see how to use ExternalSource
operator with Paddle DALI iterator, that allows us to use an external data source as an input to the Pipeline.
In order to achieve that, we have to define a Iterator or Generator class which next
function will return one or several numpy
arrays.
[1]:
import types
import collections
import numpy as np
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
batch_size = 3
epochs = 3
Defining the iterator¶
[2]:
class ExternalInputIterator(object):
def __init__(self, batch_size, device_id, num_gpus):
self.images_dir = "../../data/images/"
self.batch_size = batch_size
with open(self.images_dir + "file_list.txt", 'r') as f:
self.files = [line.rstrip() for line in f if line is not '']
# whole data set size
self.data_set_len = len(self.files)
# based on the device_id and total number of GPUs - world size
# get proper shard
self.files = self.files[self.data_set_len * device_id // num_gpus:
self.data_set_len * (device_id + 1) // num_gpus]
self.n = len(self.files)
def __iter__(self):
self.i = 0
shuffle(self.files)
return self
def __next__(self):
batch = []
labels = []
if self.i >= self.n:
raise StopIteration
for _ in range(self.batch_size):
jpeg_filename, label = self.files[self.i].split(' ')
f = open(self.images_dir + jpeg_filename, 'rb')
batch.append(np.frombuffer(f.read(), dtype = np.uint8))
labels.append(np.array([label], dtype = np.uint8))
self.i = (self.i + 1) % self.n
return (batch, labels)
@property
def size(self,):
return self.data_set_len
next = __next__
Defining the pipeline¶
Now the pipeline itself will be defined. First of all, a framework iterator will be used so we need to make sure that images and the output of the pipeline are uniforms in size, so resize operator is used. Also, iter_setup
will raise the StopIteration exception when the AdvancedExternalInputIterator run of data. Worth notice is that iterator needs to be recreated so next time iter_setup
is called it has ready data to consume.
[3]:
class ExternalSourcePipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id, external_data):
super(ExternalSourcePipeline, self).__init__(batch_size,
num_threads,
device_id,
seed=12)
self.input = ops.ExternalSource()
self.input_label = ops.ExternalSource()
self.decode = ops.ImageDecoder(device = "mixed", output_type = types.RGB)
self.res = ops.Resize(device="gpu", resize_x=240, resize_y=240)
self.cast = ops.Cast(device = "gpu",
dtype = types.UINT8)
self.external_data = external_data
self.iterator = iter(self.external_data)
def define_graph(self):
self.jpegs = self.input()
self.labels = self.input_label()
images = self.decode(self.jpegs)
images = self.res(images)
output = self.cast(images)
return (output, self.labels)
def iter_setup(self):
try:
(images, labels) = self.iterator.next()
self.feed_input(self.jpegs, images)
self.feed_input(self.labels, labels)
except StopIteration:
self.iterator = iter(self.external_data)
raise StopIteration
Using the pipeline¶
At the end let us see how it works. Please also notice the usage of last_batch_padded
that tell iterator that the difference between data set size and batch size alignment is padded by real data that could be skipped at when provided to the framework (fill_last_batch
):
[4]:
from nvidia.dali.plugin.paddle import DALIClassificationIterator as PaddleIterator
eii = ExternalInputIterator(batch_size, 0, 1)
pipe = ExternalSourcePipeline(batch_size=batch_size, num_threads=2, device_id = 0,
external_data = eii)
pii = PaddleIterator(pipe, size=eii.size, last_batch_padded=True, fill_last_batch=False)
for e in range(epochs):
for i, data in enumerate(pii):
print("epoch: {}, iter {}, real batch size: {}".format(e, i, len(np.array(data[0]["data"]))))
pii.reset()
epoch: 0, iter 0, real batch size: 3
epoch: 0, iter 1, real batch size: 3
epoch: 0, iter 2, real batch size: 3
epoch: 0, iter 3, real batch size: 3
epoch: 0, iter 4, real batch size: 3
epoch: 0, iter 5, real batch size: 3
epoch: 0, iter 6, real batch size: 3
epoch: 1, iter 0, real batch size: 3
epoch: 1, iter 1, real batch size: 3
epoch: 1, iter 2, real batch size: 3
epoch: 1, iter 3, real batch size: 3
epoch: 1, iter 4, real batch size: 3
epoch: 1, iter 5, real batch size: 3
epoch: 1, iter 6, real batch size: 3
epoch: 2, iter 0, real batch size: 3
epoch: 2, iter 1, real batch size: 3
epoch: 2, iter 2, real batch size: 3
epoch: 2, iter 3, real batch size: 3
epoch: 2, iter 4, real batch size: 3
epoch: 2, iter 5, real batch size: 3
epoch: 2, iter 6, real batch size: 3