Reduction Operators#
This section shows you how to use operators from reductions
module.
Start with a simple pipeline based on
ExternalSource
. Input has two samples per batch. Shape of both samples is (3, 3). First contains consecutive numbers, second contains consecutive even numbers. This will be useful to visualize possible reductions.
[1]:
import nvidia.dali.fn as fn
import nvidia.dali.types as types
import nvidia.dali.backend as backend
from nvidia.dali.pipeline import Pipeline
import numpy as np
batch_size = 2
def get_batch():
return [
np.reshape(np.arange(9), (3, 3)) * (i + 1) for i in range(batch_size)
]
def run_and_print(pipe):
pipe.build()
output = pipe.run()
for i, out in enumerate(output):
if type(out) == backend.TensorListGPU:
out = out.as_cpu()
output_array = out.as_array()
print("Output {}:\n{} \n".format(i, output_array))
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT64)
pipe.set_outputs(input)
run_and_print(pipe)
Output 0:
[[[ 0 1 2]
[ 3 4 5]
[ 6 7 8]]
[[ 0 2 4]
[ 6 8 10]
[12 14 16]]]
Add some reductions to the pipeline above. Begin with the
Max
operator.
[2]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT64)
max = fn.reductions.max(input)
pipe.set_outputs(max)
run_and_print(pipe)
Output 0:
[ 8 16]
As you can see, it returned the biggest value from each sample.
Perform other reductions like
Min
orSum
.
[3]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT64)
min = fn.reductions.min(input)
sum = fn.reductions.sum(input)
pipe.set_outputs(min, sum)
run_and_print(pipe)
Output 0:
[0 0]
Output 1:
[36 72]
In the code samples above we see reductions performed for all elements of each sample.
Reductions can be performed along an arbitrary set of axes. To control this behavior you can use
axes
argument.
[4]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT64)
min_axis_0 = fn.reductions.min(input, axes=0)
min_axis_1 = fn.reductions.min(input, axes=1)
pipe.set_outputs(min_axis_0, min_axis_1)
run_and_print(pipe)
Output 0:
[[0 1 2]
[0 2 4]]
Output 1:
[[ 0 3 6]
[ 0 6 12]]
Min
reduction was performed along axis 0 and 1, and it returned minimum element per column and per row respectively.
To make it easier, reductions support axis_names
argument. It allows to pass axis names rather than indices. Names are matched based on the layout of the input. You need to provide layout argument in ExternalSource
.
[5]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, layout="AB", dtype=types.INT64)
min_axis_0 = fn.reductions.min(input, axis_names="A")
min_axis_1 = fn.reductions.min(input, axis_names="B")
pipe.set_outputs(min_axis_0, min_axis_1)
run_and_print(pipe)
Output 0:
[[0 1 2]
[0 2 4]]
Output 1:
[[ 0 3 6]
[ 0 6 12]]
Note: Passing all axes will result in a full reduction, while passing empty axes will result in no reduction. This is true for both indices and layouts.
[6]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, layout="AB", dtype=types.INT64)
min_axes_full = fn.reductions.min(input, axes=(0, 1))
min_axes_empty = fn.reductions.min(input, axes=())
min_layout_full = fn.reductions.min(input, axis_names="AB")
min_layout_empty = fn.reductions.min(input, axis_names="")
pipe.set_outputs(
min_axes_full, min_axes_empty, min_layout_full, min_layout_empty
)
run_and_print(pipe)
Output 0:
[0 0]
Output 1:
[[[ 0 1 2]
[ 3 4 5]
[ 6 7 8]]
[[ 0 2 4]
[ 6 8 10]
[12 14 16]]]
Output 2:
[0 0]
Output 3:
[[[ 0 1 2]
[ 3 4 5]
[ 6 7 8]]
[[ 0 2 4]
[ 6 8 10]
[12 14 16]]]
For inputs with higher dimensionality you can pass any combination of the axes.
[7]:
def get_batch():
return [
np.reshape(np.arange(8, dtype=np.int32), (2, 2, 2)) * (i + 1)
for i in range(batch_size)
]
[8]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(
source=get_batch, layout="ABC", dtype=types.INT32
)
min_axes_empty = fn.reductions.min(input, axes=())
min_axes_0_1 = fn.reductions.min(input, axes=(0, 1))
min_layout_A_C = fn.reductions.min(input, axis_names="AC")
pipe.set_outputs(min_axes_empty, min_axes_0_1, min_layout_A_C)
run_and_print(pipe)
Output 0:
[[[[ 0 1]
[ 2 3]]
[[ 4 5]
[ 6 7]]]
[[[ 0 2]
[ 4 6]]
[[ 8 10]
[12 14]]]]
Output 1:
[[0 1]
[0 2]]
Output 2:
[[0 2]
[0 4]]
There are reductions that require additional inputs.
StdDev
andVariance
rely on the externally provided mean, and it can be calculated with theMean
reduction operator.
[9]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT32)
mean = fn.reductions.mean(input)
std_dev = fn.reductions.std_dev(input, mean)
variance = fn.reductions.variance(input, mean)
pipe.set_outputs(mean, std_dev, variance)
run_and_print(pipe)
Output 0:
[3.5 7. ]
Output 1:
[2.291288 4.582576]
Output 2:
[ 5.25 21. ]
By default, reductions remove unnecessary dimensions. This behaviour can be controlled with the
keep_dims
argument.
[10]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT32)
mean = fn.reductions.mean(input)
std_dev = fn.reductions.std_dev(input, mean, keep_dims=True)
variance = fn.reductions.variance(input, mean)
pipe.set_outputs(mean, std_dev, variance)
run_and_print(pipe)
Output 0:
[3.5 7. ]
Output 1:
[2.291288 4.582576]
Output 2:
[ 5.25 21. ]
In the code sample above applying reductions resulted in changing the output type.
The argument
dtype
can be used to specify the desired output data type.
[11]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(source=get_batch, dtype=types.INT32)
sum_int_64 = fn.reductions.sum(input, dtype=types.INT64)
sum_float = fn.reductions.sum(input, dtype=types.FLOAT)
pipe.set_outputs(sum_int_64, sum_float)
run_and_print(pipe)
Output 0:
[28 56]
Output 1:
[28. 56.]
Note: Not all data types combinations are supported. The default behaviour varies from operator to operator. A general rule is for the output type to be able to accommodate the result depending on the input type. For example, for the input type INT32
the default output type of a sum is INT32
and the default output type of a mean is FLOAT
.
All reductions can be offloaded to the GPU. GPU variants work the same way as their CPU counterparts. Below we show a code sample containing all reductions offloaded to the GPU with various parameters.
[12]:
pipe = Pipeline(batch_size=batch_size, num_threads=4, device_id=0)
with pipe:
input = fn.external_source(
source=get_batch, layout="ABC", dtype=types.INT32
)
min = fn.reductions.min(input.gpu(), axis_names="AC", keep_dims=True)
max = fn.reductions.max(input.gpu(), keep_dims=True)
sum = fn.reductions.sum(input.gpu(), dtype=types.INT64)
mean = fn.reductions.mean(input.gpu(), axes=0)
mean_square = fn.reductions.mean_square(input.gpu())
rms = fn.reductions.rms(input.gpu(), axes=(), dtype=types.FLOAT)
std_dev = fn.reductions.std_dev(input.gpu(), mean, axes=0)
variance = fn.reductions.variance(
input.gpu(), mean.gpu(), axes=0, keep_dims=True
)
pipe.set_outputs(min, max, sum, mean, mean_square, rms, std_dev, variance)
run_and_print(pipe)
Output 0:
[[[[0]
[2]]]
[[[0]
[4]]]]
Output 1:
[[[[ 7]]]
[[[14]]]]
Output 2:
[28 56]
Output 3:
[[[ 2. 3.]
[ 4. 5.]]
[[ 4. 6.]
[ 8. 10.]]]
Output 4:
[17.5 70. ]
Output 5:
[[[[ 0. 1.]
[ 2. 3.]]
[[ 4. 5.]
[ 6. 7.]]]
[[[ 0. 2.]
[ 4. 6.]]
[[ 8. 10.]
[12. 14.]]]]
Output 6:
[[[2. 2.]
[2. 2.]]
[[4. 4.]
[4. 4.]]]
Output 7:
[[[[ 4. 4.]
[ 4. 4.]]]
[[[16. 16.]
[16. 16.]]]]