DALI expressions and arithmetic operators

In this example, we will show simple examples how to use arithmetic operators in DALI Pipeline that allow for element-wise operations on tensors inside a pipeline. We will show available operators and examples of using constant and scalar inputs.

Supported operators

DALI currently supports binary arithmetic operators, namely: +, -, *, / and //. They can be used as an operation between two tensors, between a tensor and a scalar or a tensor and a constant. By tensor we consider the output of DALI operators (either regular ones or other arithmetic operators).

Prepare the test pipeline

First, we will prepare the helper code, so we can easily manipulate the types and values that will appear as tensors in the DALI pipeline.

We use from __future__ import division to allow / and // as true division and floor division operators. We will be using numpy as source for the custom provided data and we also need to import several things from DALI needed to create Pipeline and use ExternalSource Operator.

[1]:
from __future__ import division
import numpy as np
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.types import Constant

Defining the data

As we are dealing with binary operators, we need two inputs. We will create a simple helper function that returns two batches of hardcoded data, stored as np.int32. In an actual scenario the data processed by DALI arithmetic operators would be tensors produced by other Operator containing some images, video sequences or other data.

You can experiment by changing those values or adjusting the get_data() function to use different input data. Keep in mind that shapes of both inputs need to match as those will be element-wise operations.

[2]:
left_magic_values = [
    [[42, 7, 0], [0, 0, 0]],
    [[5, 10, 15], [10, 100, 1000]]
]

right_magic_values = [
    [[3, 3, 3], [1, 3, 5]],
    [[1, 5, 5], [1, 1, 1]]
]

batch_size = len(left_magic_values)

def convert_batch(batch):
    return [np.int32(tensor) for tensor in batch]

def get_data():
    return (convert_batch(left_magic_values), convert_batch(right_magic_values))

Operating on tensors

Defining the pipeline

The next step is to define the Pipeline. We override Pipeline.iter_setup, a method called by the pipeline before every Pipeline.run. It is meant to feed the data into ExternalSource() operators indicated by self.left and self.right. The data will be obtained from get_data function to which we pass the left and right types. We are using input of type np.int32 for now.

Note, that we do not need to instantiate any additional operators, we can use regular Python arithmetic expressions on the results of other operators in the define_graph step.

Let’s manipulate the source data by adding, multiplying and dividing it. define_graph will return both our data inputs and the result of applying arithmetic operations to them.

[3]:
 class ArithmeticPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ArithmeticPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
        self.left_source = ops.ExternalSource()
        self.right_source = ops.ExternalSource()

    def define_graph(self):
        self.left = self.left_source()
        self.right = self.right_source()
        sum_result = self.left + self.right
        mul_result = self.left * self.right
        div_result = self.left // self.right
        return self.left, self.right, sum_result, mul_result, div_result

    def iter_setup(self):
        # Get the data batches with selected types
        (l, r) = get_data()
        # Feed it to external source
        self.feed_input(self.left, l)
        self.feed_input(self.right, r)

Running the pipeline

Lets build and run our pipeline

[4]:
pipe = ArithmeticPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()

Now it’s time to display the results:

[5]:
def examine_output(pipe_out):
    l = pipe_out[0].as_array()
    r = pipe_out[1].as_array()
    sum_out = pipe_out[2].as_array()
    mul_out = pipe_out[3].as_array()
    div_out = pipe_out[4].as_array()
    print("{}\n+\n{}\n=\n{}\n\n".format(l, r, sum_out))
    print("{}\n*\n{}\n=\n{}\n\n".format(l, r, mul_out))
    print("{}\n//\n{}\n=\n{}\n\n".format(l, r, div_out))

examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
+
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[  45   10    3]
  [   1    3    5]]

 [[   6   15   20]
  [  11  101 1001]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
*
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[ 126   21    0]
  [   0    0    0]]

 [[   5   50   75]
  [  10  100 1000]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
//
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[  14    2    0]
  [   0    0    0]]

 [[   5    2    3]
  [  10  100 1000]]]


As we can see the resulting tensors are obtained by applying the arithmetic operation between corresponding elements of its inputs.

The shapes of the arguments to arithmetic operators should match (with an exception for scalar tensor inputs that we will describe in the next section), otherwise we will get an error.

Constant and scalar operands

Until now we considered only tensor inputs of matching shapes for inputs of arithmetic operators. DALI allows one of the operands to be a constant or a tensor consisting of scalars. They can appear on both sides of binary expression.

Constants

In define_graph step, constant operand for arithmetic operator can be: values of Python’s int and float types used directly, or those values wrapped in nvidia.dali.types.Constant. Operation between tensor and constant results in the constant being broadcasted to all elements of the tensor. The same costant is used with all samples in the batch.

Note: Currently all values of integral constants are passed internally to DALI as int32 and all values of floating point constants are passed to DALI as float32.

The Python int values will be treated as int32 and the float as float32 in regard to type promotions.

The DALI Constant can be used to indicate other types. It accepts DALIDataType enum values as second argument and has convenience member functions like .uint8() or .float32() that can be used for conversions.

Using the Constants

Let’s adjust the Pipeline to utilize constants first.

[6]:
 class ArithmeticConstantsPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ArithmeticConstantsPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
        self.left_source = ops.ExternalSource()
        self.right_source = ops.ExternalSource()

    def define_graph(self):
        self.left = self.left_source()
        self.right = self.right_source()
        add_200 = self.left + 200
        mul_075 = self.left * 0.75
        sub_15 = Constant(15).float32() - self.right
        return self.left, self.right, add_200, mul_075, sub_15

    def iter_setup(self):
        # Get the data batches with selected types
        (l, r) = get_data()
        # Feed it to external source
        self.feed_input(self.left, l)
        self.feed_input(self.right, r)
[7]:
pipe = ArithmeticConstantsPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()

Now it’s time to display the results:

[8]:
def examine_output(pipe_out):
    l = pipe_out[0].as_array()
    r = pipe_out[1].as_array()
    add_200 = pipe_out[2].as_array()
    mul_075 = pipe_out[3].as_array()
    sub_15 = pipe_out[4].as_array()
    print("{}\n+ 200 =\n{}\n\n".format(l, add_200))
    print("{}\n* 0.75 =\n{}\n\n".format(l, mul_075))
    print("15 -\n{}\n=\n{}\n\n".format(r, sub_15))

examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
+ 200 =
[[[ 242  207  200]
  [ 200  200  200]]

 [[ 205  210  215]
  [ 210  300 1200]]]


[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
* 0.75 =
[[[ 31.5    5.25   0.  ]
  [  0.     0.     0.  ]]

 [[  3.75   7.5   11.25]
  [  7.5   75.   750.  ]]]


15 -
[[[3 3 3]
  [1 3 5]]

 [[1 5 5]
  [1 1 1]]]
=
[[[12. 12. 12.]
  [14. 12. 10.]]

 [[14. 10. 10.]
  [14. 14. 14.]]]


As we can see the constant operand is broadcasted to all elements of all tensors in the batch.

Scalars

As an addition to element-wise operation between Tensors of the same sizes, DALI allows to use tensors containing scalar values - that is a batch of elements with shape {1} as one of the operands. The tensor of scalars will behave similarly to a constant described above - each scalar value will be broadcasted to every element of the other tensor operand. Note that contrary to constants, the tensor is still as a batch of elements - each scalar value is broadcasted to corresponding tensor.

Using scalar tensors

We will use Uniform Operator to generate a batch of random scalar values, that we will use in our example. We already defined our inputs to be a batch of tensors with shape = {2 x 3}, so we will be able to observe how the single constant or scalar values are propagated to all elements of the second operand.

[9]:
class ArithmeticScalarsPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(ArithmeticScalarsPipeline, self).__init__(batch_size, num_threads, device_id, seed=12)
        self.tensor_source = ops.ExternalSource()
        self.uniform = ops.Uniform(range=[-10, 10])

    def define_graph(self):
        self.tensor = self.tensor_source()
        uni = self.uniform()
        return self.tensor, uni, self.tensor * uni

    def iter_setup(self):
        # Get the data batches with selected types, we only use one input
        (t, _) = get_data()
        # Feed it to external source
        self.feed_input(self.tensor, t)

Now it’s time to build and run the Pipeline. It will allow to scale our input by some random numbers generated by the Uniform Operator.

[10]:
pipe = ArithmeticScalarsPipeline(batch_size = batch_size, num_threads = 2, device_id = 0)
pipe.build()
out = pipe.run()
[11]:
def examine_output(pipe_out):
    t = pipe_out[0].as_array()
    uni = pipe_out[1].as_array()
    scaled = pipe_out[2].as_array()
    print("{}\n*\n{}\n=\n{}".format(t, uni, scaled))

examine_output(out)
[[[  42    7    0]
  [   0    0    0]]

 [[   5   10   15]
  [  10  100 1000]]]
*
[[3.9766016]
 [0.666193 ]]
=
[[[167.01727   27.836212   0.      ]
  [  0.         0.         0.      ]]

 [[  3.330965   6.66193    9.992895]
  [  6.66193   66.6193   666.193   ]]]

Notice how we multiply corresponding elements of two batches (consisting of two elements here). Their shapes do not match, but as the second one operand is a batch two of {1}-shaped tensors, it is considered a scalar input.