DALI expressions and arithmetic operators¶
In this example, we will show simple examples how to use arithmetic operators in DALI Pipeline that allow for element-wise operations on tensors inside a pipeline. We will show available operators and examples of using constant and scalar inputs.
Supported operators¶
DALI currently supports binary arithmetic operators, namely: +
, -
, *
, /
and //
. They can be used as an operation between two tensors, between a tensor and a scalar or a tensor and a constant. By tensor we consider the output of DALI operators (either regular ones or other arithmetic operators).
Prepare the test pipeline¶
First, we will prepare the helper code, so we can easily manipulate the types and values that will appear as tensors in the DALI pipeline.
We use from __future__ import division
to allow /
and //
as true division and floor division operators. We will be using numpy as source for the custom provided data and we also need to import several things from DALI needed to create Pipeline and use ExternalSource Operator.
[1]:
from __future__ import division
import numpy as np
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.types import Constant
Defining the data¶
As we are dealing with binary operators, we need two inputs. We will create a simple helper function that returns two batches of hardcoded data, stored as np.int32
. In an actual scenario the data processed by DALI arithmetic operators would be tensors produced by other Operator containing some images, video sequences or other data.
You can experiment by changing those values or adjusting the get_data()
function to use different input data. Keep in mind that shapes of both inputs need to match as those will be element-wise operations.
[2]:
left_magic_values = [
[[42, 7, 0], [0, 0, 0]],
[[5, 10, 15], [10, 100, 1000]]
]
right_magic_values = [
[[3, 3, 3], [1, 3, 5]],
[[1, 5, 5], [1, 1, 1]]
]
batch_size = len(left_magic_values)
def convert_batch(batch):
return [np.int32(tensor) for tensor in batch]
def get_data():
return (convert_batch(left_magic_values), convert_batch(right_magic_values))
Operating on tensors¶
Defining the pipeline¶
The next step is to define the Pipeline. We override Pipeline.iter_setup
, a method called by the pipeline before every Pipeline.run
. It is meant to feed the data into ExternalSource()
operators indicated by self.left
and self.right
. The data will be obtained from get_data
function to which we pass the left and right types. We are using input of type np.int32
for now.
Note, that we do not need to instantiate any additional operators, we can use regular Python arithmetic expressions on the results of other operators in the define_graph
step.
Let’s manipulate the source data by adding, multiplying and dividing it. define_graph
will return both our data inputs and the result of applying arithmetic operations to them.
[3]:
class ArithmeticPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(ArithmeticPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
self.left_source = ops.ExternalSource()
self.right_source = ops.ExternalSource()
def define_graph(self):
self.left = self.left_source()
self.right = self.right_source()
sum_result = self.left + self.right
mul_result = self.left * self.right
div_result = self.left // self.right
return self.left, self.right, sum_result, mul_result, div_result
def iter_setup(self):
# Get the data batches with selected types
(l, r) = get_data()
# Feed it to external source
self.feed_input(self.left, l)
self.feed_input(self.right, r)
Running the pipeline¶
Lets build and run our pipeline
[4]:
pipe = ArithmeticPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()
Now it’s time to display the results:
[5]:
def examine_output(pipe_out):
l = pipe_out[0].as_array()
r = pipe_out[1].as_array()
sum_out = pipe_out[2].as_array()
mul_out = pipe_out[3].as_array()
div_out = pipe_out[4].as_array()
print("{}\n+\n{}\n=\n{}\n\n".format(l, r, sum_out))
print("{}\n*\n{}\n=\n{}\n\n".format(l, r, mul_out))
print("{}\n//\n{}\n=\n{}\n\n".format(l, r, div_out))
examine_output(out)
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
+
[[[3 3 3]
[1 3 5]]
[[1 5 5]
[1 1 1]]]
=
[[[ 45 10 3]
[ 1 3 5]]
[[ 6 15 20]
[ 11 101 1001]]]
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
*
[[[3 3 3]
[1 3 5]]
[[1 5 5]
[1 1 1]]]
=
[[[ 126 21 0]
[ 0 0 0]]
[[ 5 50 75]
[ 10 100 1000]]]
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
//
[[[3 3 3]
[1 3 5]]
[[1 5 5]
[1 1 1]]]
=
[[[ 14 2 0]
[ 0 0 0]]
[[ 5 2 3]
[ 10 100 1000]]]
As we can see the resulting tensors are obtained by applying the arithmetic operation between corresponding elements of its inputs.
The shapes of the arguments to arithmetic operators should match (with an exception for scalar tensor inputs that we will describe in the next section), otherwise we will get an error.
Constant and scalar operands¶
Until now we considered only tensor inputs of matching shapes for inputs of arithmetic operators. DALI allows one of the operands to be a constant or a tensor consisting of scalars. They can appear on both sides of binary expression.
Constants¶
In define_graph
step, constant operand for arithmetic operator can be: values of Python’s int
and float
types used directly, or those values wrapped in nvidia.dali.types.Constant
. Operation between tensor and constant results in the constant being broadcasted to all elements of the tensor. The same costant is used with all samples in the batch.
Note: Currently all values of integral constants are passed internally to DALI as int32 and all values of floating point constants are passed to DALI as float32.
The Python int
values will be treated as int32
and the float
as float32
in regard to type promotions.
The DALI Constant
can be used to indicate other types. It accepts DALIDataType
enum values as second argument and has convenience member functions like .uint8()
or .float32()
that can be used for conversions.
Using the Constants¶
Let’s adjust the Pipeline to utilize constants first.
[6]:
class ArithmeticConstantsPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(ArithmeticConstantsPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
self.left_source = ops.ExternalSource()
self.right_source = ops.ExternalSource()
def define_graph(self):
self.left = self.left_source()
self.right = self.right_source()
add_200 = self.left + 200
mul_075 = self.left * 0.75
sub_15 = Constant(15).float32() - self.right
return self.left, self.right, add_200, mul_075, sub_15
def iter_setup(self):
# Get the data batches with selected types
(l, r) = get_data()
# Feed it to external source
self.feed_input(self.left, l)
self.feed_input(self.right, r)
[7]:
pipe = ArithmeticConstantsPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()
Now it’s time to display the results:
[8]:
def examine_output(pipe_out):
l = pipe_out[0].as_array()
r = pipe_out[1].as_array()
add_200 = pipe_out[2].as_array()
mul_075 = pipe_out[3].as_array()
sub_15 = pipe_out[4].as_array()
print("{}\n+ 200 =\n{}\n\n".format(l, add_200))
print("{}\n* 0.75 =\n{}\n\n".format(l, mul_075))
print("15 -\n{}\n=\n{}\n\n".format(r, sub_15))
examine_output(out)
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
+ 200 =
[[[ 242 207 200]
[ 200 200 200]]
[[ 205 210 215]
[ 210 300 1200]]]
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
* 0.75 =
[[[ 31.5 5.25 0. ]
[ 0. 0. 0. ]]
[[ 3.75 7.5 11.25]
[ 7.5 75. 750. ]]]
15 -
[[[3 3 3]
[1 3 5]]
[[1 5 5]
[1 1 1]]]
=
[[[12. 12. 12.]
[14. 12. 10.]]
[[14. 10. 10.]
[14. 14. 14.]]]
As we can see the constant operand is broadcasted to all elements of all tensors in the batch.
Scalars¶
As an addition to element-wise operation between Tensors of the same sizes, DALI allows to use tensors containing scalar values - that is a batch of elements with shape {1}
as one of the operands. The tensor of scalars will behave similarly to a constant described above - each scalar value will be broadcasted to every element of the other tensor operand. Note that contrary to constants, the tensor is still as a batch of elements - each scalar value is broadcasted to corresponding tensor.
Using scalar tensors¶
We will use Uniform
Operator to generate a batch of random scalar values, that we will use in our example. We already defined our inputs to be a batch of tensors with shape = {2 x 3}
, so we will be able to observe how the single constant or scalar values are propagated to all elements of the second operand.
[9]:
class ArithmeticScalarsPipeline(Pipeline):
def __init__(self, batch_size, num_threads, device_id):
super(ArithmeticScalarsPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
self.tensor_source = ops.ExternalSource()
self.uniform = ops.Uniform(range=[-10, 10])
def define_graph(self):
self.tensor = self.tensor_source()
uni = self.uniform()
return self.tensor, uni, self.tensor * uni
def iter_setup(self):
# Get the data batches with selected types, we only use one input
(t, _) = get_data()
# Feed it to external source
self.feed_input(self.tensor, t)
Now it’s time to build and run the Pipeline. It will allow to scale our input by some random numbers generated by the Uniform
Operator.
[10]:
pipe = ArithmeticScalarsPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()
[11]:
def examine_output(pipe_out):
t = pipe_out[0].as_array()
uni = pipe_out[1].as_array()
scaled = pipe_out[2].as_array()
print("{}\n*\n{}\n=\n{}".format(t, uni, scaled))
examine_output(out)
[[[ 42 7 0]
[ 0 0 0]]
[[ 5 10 15]
[ 10 100 1000]]]
*
[[3.9766016]
[0.666193 ]]
=
[[[167.01727 27.836212 0. ]
[ 0. 0. 0. ]]
[[ 3.330965 6.66193 9.992895]
[ 6.66193 66.6193 666.193 ]]]
Notice how we multiply corresponding elements of two batches (consisting of two elements here). Their shapes do not match, but as the second one operand is a batch two of {1}
-shaped tensors, it is considered a scalar input.