Simplified Python operator creation via the create_op decorator

Holoscan v2.2.0
Warning

The holoscan.decorator.create_op() decorator and the supporting holoscan.decorator.Input and holoscan.decorator.Output classes are new in Holoscan v2.2 and are still considered experimental. They are usable now, but it is possible that some backwards incompatible changes to the behavior or API may be made based on initial feedback.

For convenience, a holoscan.decorator.create_op() decorator is provided which can be used to automatically convert a simple Python function/generator or a class into a native Python holoscan.core.Operator. The wrapped function body (or the __call__ method if create_op is applied to a class) will correspond to the computation to be done in the holoscan.core.Operator.compute() method, but without any need to explicitly make any calls to holoscan.core.InputContext.receive() to receive inputs or holoscan.core.OutputContext.emit() to transmit the output. Any necessary input or output ports will have been automatically generated.

Consider first a simple Python function named mask_and_offset that takes image and mask tensors as input and multiplies them, followed by adding some scalar offset.

Copy
Copied!
            

def mask_and_offset(image, mask, offset=1.5): return image * mask + offset

To turn this into an function that returns a corresponding operator we can add the create_op decorator like this:

Copy
Copied!
            

from holoscan.decorator import create_op @create_op( inputs=("image", "mask"), outputs="out", ) def mask_and_offset(image, mask, offset=1.5): return image * mask + offset

By supplying the inputs argument we are specifying that there are two input ports, named “image” and “mask”. By setting outputs="out" we are indicating that the output will be transmitted on a port named “out”. When inputs are specified by simple strings in this way, the names used must map to variable names in the wrapped function’s signature. We will see later that it is possible to use the holoscan.decorator.Input class to provide more control over how inputs are mapped to function arguments. Similarly, we will see that the holoscan.decorator.Output class can be used to provide more control over how the function output is mapped to any output port(s).

There is also an optional, cast_tensors argument to create_op. For convenience, this defaults to True, which results in any tensor-like objects being automatically cast to a NumPy or CuPy array (for host or device tensors, respectively) before they are passed on to the function. If this is not desired (e.g. due to working with a different third party tensor framework than NumPy or CuPy), the user can set cast_tensors=False, and manually handle casting of any holoscan.Tensor objects to the desired form in the function body. This casting option applies to either single tensors or a tensor map (dict[Tensor]).

This decorated function can then be used within the compose method of an Application to create an operator corresponding to this computation:

Copy
Copied!
            

from holoscan.core import Application, Operator def MyApp(Application): def compose(self) mask_op = mask_and_offset(self, name="my_scaling_op", offset=0.0) # verify that an Operator was generated assert(isinstance(mask_op, Operator)) # now add any additional operators and create the computation graph using add_flow

Note that as for all other Operator classes, it is required to supply the application (or fragment) as the first argument (self here). The name kwarg is always supported and is the name that will be assigned to the operator. Due to the use of this kwarg to specify the operator name, the wrapped function (mask_and_offset in this case) should not use name as an argument name. In this case, we specified offset=0.0 which would override the default value of offset=1.5 in the function signature.

For completeness, the use of the create_op decorator on mask_and_offset is equivalent to if the user had defined the following MaskAndOffsetOp class and used it in MyApp.compose:

Copy
Copied!
            

def MaskAndOffsetOp(Operator): def setup(self, spec): spec.input("image") spec.input("mask") spec.output("out") def compute(self, op_input, op_output, context): # Simplified logic here assumes received values are GPU tensors # create_op would add additional logic to handle the inputs image = op_input.receive("image") image = cp.asarray(image) mask = op_input.receive("mask") mask = cp.asarray(mask) out = image * mask + offset op_output.emit(out, "out")

This section will cover additional use cases where using a str or Tuple[str] for the inputs argument is insufficient.

Scenario 1: Assume that the upstream operator sends a tensormap to a given input port and we need to specify which tensor(s) in the tensormap will map to which input port.

For a concrete example, suppose we want to print a tensor’s shape using a function like:

Copy
Copied!
            

def print_shape(tensor): print(f"{tensor.shape = }")

but the upstream operator outputs a dictionary containing two tensors named “image” and “labels”. We could use this operator by specifying which tensor name on a particular input port would map to the function’s “tensor” argument. For example:

Copy
Copied!
            

@create_op(inputs=Input("input_tensor", arg_map={"image": "tensor"})) def print_shape(tensor): print(f"{tensor.shape = }")

would create an operator with a single input port named “input_tensor” and no output port. The input port may receive a tensormap with any number of tensors, but will only use the tensor named “image”, mapping it to the “tensor” argument of the wrapped function. In general, the arg_map is a dictionary mapping tensor names found on the port to their corresponding function argument names.

Scenario 2: we want to override the scheduling condition present on a port. This can be done by specifying Input with the condition and optionally condition_kwargs arguments. For example, to override the MessageAvailableCondition that is added to the port by default and allow it to call compute even when no input message is available:

Copy
Copied!
            

@create_op(inputs=Input("input_tensor", condition=ConditionType.NONE, condition_kwargs={}))

Scenario 3: we want to override the parameters of the receiver present on a port. For example, we could specify a different policy for the double buffer receiver that is used by default (policy=1 corresponds to discarding incoming messages when the queue is already full)

Copy
Copied!
            

@create_op(inputs=Input("input_tensor", connector=ConditionType.DOUBLE_BUFFER, connector_kwargs=dict(capacity=1, policy=1)))

To support a case where multiple output ports should be present, the user must have the function return a dict. The holoscan.decorator.Output class then has a tensor_names keyword argument that can be specified to indicate which items in the dictionary are to be transmitted on a given output port.

For example, assume we have a function that generates three tensors, x, y and z and we want to transmit x and y on port “out1” while z will be transmitted on port “out2”. This can be done by specifying outputs as follows in the create_op call:

Copy
Copied!
            

@create_op( outputs=( Output("out1", tensor_names=("x", "y")), Output("out2", tensor_names=("z",)), ), ) def xyz_generator(nx=32, ny=32, nz=16): x = cp.arange(nx, dtype=np.uint32) y = cp.arange(ny, dtype=np.uint32) z = cp.arange(nz, dtype=np.uint32) # must return a dict type when Output arg(s) with `tensor_names` is used return dict(x=x, y=y, z=z)

This operator has no input ports and three optional keyword arguments. It splits the output tensors across two ports as described above. All names used in tensor_names must correspond to keys present in the dict emitted by the object. Often the dict values are tensors, but that is not a requirement.

The holoscan.decorator.Output class also supports condition, condition_kwargs, connector and connector_kwargs that work in the same way as shown for holoscan.decorator.Input above. For example, to override the transmitter queue policy for a single output port named “output_tensor”

Copy
Copied!
            

@create_op(inputs=Output("output_tensor", connector=ConditionType.DOUBLE_BUFFER, connector_kwargs=dict(capacity=1, policy=1))

note that tensor_names was not specified which means that the returned object does not need to be a dict. The object itself will be emitted on the “output_tensor” port.

Note

When specifying the inputs and outputs arguments to create_op, please make sure that all ports have unique names. As a concrete example, if an operator has a single input and output port that are used to send images, one should use unique port names like “image_in” and “image_out” rather than using “image” for both.

There SDK includes a python_decorator example showing interoperability of wrapped C++ operators (VideoStreamReplayerOp and HolovizOp) alongside native Python operators created via the create_op decorator.

The start of this application imports a couple of the built in C+±based operators with Python bindings (HolovizOp and VideoStreamReplayerOp). In addition to these, two new operators are created via the create_op decorator APIs.

Copy
Copied!
            

import os from holoscan.core import Application from holoscan.decorator import Input, Output, create_op from holoscan.operators import HolovizOp, VideoStreamReplayerOp sample_data_path = os.environ.get("HOLOSCAN_INPUT_PATH", "../data") @create_op( inputs="tensor", outputs="out_tensor", ) def invert(tensor): tensor = 255 - tensor return tensor @create_op(inputs=Input("in", arg_map="tensor"), outputs=Output("out", tensor_names=("frame",))) def tensor_info(tensor): print(f"tensor from 'in' port: shape ={tensor.shape}, " f"dtype ={tensor.dtype.name}") return tensor

The first is created by adding the decorator to a function named invert which just inverts the (8-bit RGB) color space values. A second operator, is created by adding the decorator to a function named tensor_info, which assumes that the input is a CuPy or NumPy tensor, and prints its shape and data type. Note that create_op’s default cast_tensors=True option ensures that any host or device tensors are cast to NumPy or CuPy arrays, respectively. This is why it is safe to use NumPy APIs in the function bodies. If the user wants to receive the holoscan.Tensor object directly and manually handle the casting to a different type of object in the function body, then cast_tensors=False should be specified in the keyword arguments to create_op.

Now that we have defined or imported all of the operators, we can build an application in the usual way by inheriting from the Application class and implementing the compose method. The remainder of the code for this example is shown below.

Copy
Copied!
            

class VideoReplayerApp(Application): """Example of an application that uses the operators defined above. This application has the following operators: - VideoStreamReplayerOp - HolovizOp - invert (created via decorator API) - tensor_info (created via decorator API) `VideoStreamReplayerOp` reads a video file and sends the frames to the HolovizOp. The `invert` operator inverts the color map (the 8-bit `value` in each color channel is set to `255 - value`). The `tensor_info` operator prints information about the tensors shape and data type. `HolovizOp` displays the frames. """ def compose(self): video_dir = os.path.join(sample_data_path, "racerx") if not os.path.exists(video_dir): raise ValueError(f"Could not find video data:{video_dir=}") # Define the replayer and holoviz operators replayer = VideoStreamReplayerOp( self, name="replayer", directory=video_dir, basename="racerx", frame_rate=0, # as specified in timestamps repeat=False, # default: false realtime=True, # default: true count=40, # default: 0 (no frame count restriction) ) invert_op = invert(self, name="image_invert") info_op = tensor_info(self, name="tensor_info") visualizer = HolovizOp( self, name="holoviz", width=854, height=480, # name="frame" to match Output argument to create_op for tensor_info tensors=[dict(name="frame", type="color", opacity=1.0, priority=0)], ) # Define the workflow self.add_flow(replayer, invert_op, {("output", "tensor")}) self.add_flow(invert_op, info_op, {("out_tensor", "in")}) self.add_flow(info_op, visualizer, {("out", "receivers")}) def main(): app = VideoReplayerApp() app.run() if __name__ == "__main__": main()

The highlighted lines show how Operators corresponding to the invert and tensor_info functions are created by passing the application itself as the first argument. The invert_op and info_op variables now correspond to a holsocan.core.Operator class and can be connected in the usual way using add_flow to define the computation. Note that a name was provided for these operators, via the optional name keyword argument. In this case each operator is only used once, but if the same operator is to be used more than once in an application, each should be given a unique name.

The create_op decorator can be applied to a generator in the same way as for a function. In this case, a BooleanCondition will automatically be added to the operator that will stop it from trying to call compute again once the generator is exhausted (has no more values to yield). The following is a basic example of decorating a generator for integers from 1 to count:

Copy
Copied!
            

@create_op(outputs="out") def source_generator(count): yield from range(1, count + 1)

The compose method can then create an operator from this decorated generator as follows

Copy
Copied!
            

count_op = source_generator(self, count=100, name="int_source")

The create_op decorator can also be applied to a class implementing the __call__ method, to turn it into an Operator(). One reason to choose a class vs. a function is if there is some internal state that needs to be maintained across calls. For example, the operator defined below casts the input data to 32-bit floating point and on even frames also negates the values.

Copy
Copied!
            

@create_op class NegateEven: def __init__(self, start_index=0): self.counter = start_index def __call__(self, x): # cast tensor to 32-bit floating point x = x.astype('float32') # negate the values if the frame is even if self.counter % 2 == 0: x = -x return x

In this case, since there is only a single input and output for the function, we can omit the inputs and outputs arguments in the call to create_op. In this case the input port will have name "x", as determined from the variable name in the function signature. The output port will be have an empty name "". To use different port names, the inputs and/or outputs arguments should be specified.

The compose method can then create an operator from this decorated generator as follows. Note that any positional or keyword arguments in the __init__ method would be supplied during the NegateEven call. This returns a function (not yet an operator) that can then be called to generate the operator. This is shown below

Copy
Copied!
            

negate_op_creation_func = NegateEven(start_index=0) # `negate_op_creation_func` is a function that returns an Operator negate_even_op = negate_op_creation_func(self, name="negate_even") # call the function to create an instance of NegateEvenOp

or more concisely as just

Copy
Copied!
            

negate_even_op = NegateEven(start_index=0)(self, name="negate_even")

Note that the operator class as defined above is approximately equivalent to the Python native operator defined below. We show it here explicitly for reference.

Copy
Copied!
            

import cupy as cp import numpy as np class NegateEvenOp(Operator): def __init__(self, fragment, *args, start_index=0, **kwargs): self.counter = start_index super().__init__(fragment, *args, **kwargs) def setup(self, spec): spec.input("x") spec.output("") def compute(op_input, op_output, context): x = op_input.receive("x") # cast to CuPy or NumPy array # (validation that `x` is a holoscan.Tensor is omitted for simplicity) if hasattr(x, '__cuda_array_interface__'): x = cupy.asarray(x) else: x = numpy.asarray(x) # cast tensor to 32-bit floating point x = x.astype('float32') # negate the values if the frame is even if self.counter % 2 == 0: x = -x op_output.emit(x, "")

The primary differences between this NegateEvenOp class and the decorated NegateEven above are:

  • NegateEven does not need to define a setup method

  • NegateEven does not inherit from Operator and so does not call its __init__ from the constructor.

  • The NegateEven::__call__ method is simpler than the NegateEvenOp::compute method as receive and emit methods do not need to be explicitly called and casting to a NumPy or CuPy array is automatically handled for NegateEven.

Previous Writing Python bindings for a C++ Operator
Next Built-in Operators and Extensions
© Copyright 2022-2024, NVIDIA. Last updated on Jul 17, 2024.