Distributed applications must serialize any objects that are to be sent between the fragments of a multi-fragment application. Serialization involves binary serialization to a buffer that will be sent from one fragment to another via the Unified Communications X (UCX) library. For tensor types (e.g., holoscan::Tensor), no actual copy is made, but instead transmission is done directly from the original tensor’s data and only a small amount of header information is copied to the serialization buffer.

A table of the types that have codecs pre-registered so that they can be serialized between fragments using Holoscan SDK is given below.

Type Class Specific Types integers int8_t, int16_t, int32_t, int64_t, uint8_t, uint16_t, uint32_t, uint64_t floating point float, double, complex , complex boolean bool strings std::string std::vector T is std::string or any of the boolean, integer or floating point types above std::vector > T is std::string or any of the boolean, integer or floating point types above std::vector a vector of InputSpec objects that are specific to HolovizOp std::shared_ptr T is any of the scalar, vector or std::string types above tensor types holoscan::Tensor, nvidia::gxf::Tensor, nvidia::gxf::VideoBuffer, nvidia::gxf::AudioBuffer GXF-specific types nvidia::gxf::TimeStamp, nvidia::gxf::EndOfStream

Warning If an operator transmitting both CPU and GPU tensors is to be used in distributed applications, the same output port cannot mix both GPU and CPU tensors. CPU and GPU tensor outputs should be placed on separate output ports. This is a limitation of the underlying UCX library being used for zero-copy tensor serialization between operators. As a concrete example, assume an operator, MyOperator with a single output port named “out” defined in it’s setup method. If the output port is only ever going to connect to other operators within a fragment, but never across fragments then it is okay to have a TensorMap with a mixture of host and device arrays on that single port. C++

Python Copy Copied! void MyOperator::setup(OperatorSpec& spec) { spec.output<holoscan::TensorMap>("out"); } void MyOperator::compute(OperatorSpec& spec) { // omitted: some computation resulting in multiple holoscan::Tensors // (two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device ("gpu_tensor"). TensorMap out_message; // insert all tensors in one TensorMap (mixing CPU and GPU tensors is okay when ports only connect within a Fragment) out_message.insert({"coordinates", cpu_coords_tensor}); out_message.insert({"metrics", cpu_metric_tensor}); out_message.insert({"mask", gpu_tensor}); op_output.emit(out_message, "out"); } Copy Copied! class MyOperator: def setup(self, spec: OperatorSpec): spec.output("out") def compute(self, op_input, op_output, context): # Omitted: assume some computation resulting in three holoscan::Tensor or tensor-like # objects. Two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device # ("gpu_tensor"). # mixing CPU and GPU tensors in a single dict is okay only for within-Fragment connections op_output.emit( dict( coordinates=cpu_coords_tensor, metrics=cpu_metrics_tensor, mask=gpu_tensor, ), "out" ) However, this mixing of CPU and GPU arrays on a single port will not work for distributed apps and instead separate ports should be used if it is necessary for an operator to communicate across fragments. C++

Python Copy Copied! void MyOperator::setup(OperatorSpec& spec) { spec.output<holoscan::TensorMap>("out_host"); spec.output<holoscan::TensorMap>("out_device"); } void MyOperator::compute(OperatorSpec& spec) { // some computation resulting in a pair of holoscan::Tensor, one on CPU ("cpu_tensor") and one on device ("gpu_tensor"). TensorMap out_message_host; TensorMap out_message_device; // put all CPU tensors on one port out_message_host.insert({"coordinates", cpu_coordinates_tensor}); out_message_host.insert({"metrics", cpu_metrics_tensor}); op_output.emit(out_message_host, "out_host"); // put all GPU tensors on another out_message_device.insert({"mask", gpu_tensor}); op_output.emit(out_message_device, "out_device"); } Copy Copied! class MyOperator: def setup(self, spec: OperatorSpec): spec.output("out_host") spec.output("out_device") def compute(self, op_input, op_output, context): # Omitted: assume some computation resulting in three holoscan::Tensor or tensor-like # objects. Two on CPU ("cpu_coords_tensor" and "cpu_metric_tensor") and one on device # ("gpu_tensor"). # split CPU and GPU tensors across ports for compatibility with inter-fragment communication op_output.emit( dict(coordinates=cpu_coords_tensor, metrics=cpu_metrics_tensor), "out_host" ) op_output.emit(dict(mask=gpu_tensor), "out_device")

For the Python API, any array-like object supporting the DLPack interface, __array_interface__ or __cuda_array_interface__ will be transmitted using Tensor serialization. This is done to avoid data copies for performance reasons. Objects of type list[holoscan.HolovizOp.InputSpec] will be sent using the underlying C++ serializer for std::vector<HolovizOp::InputSpec> . All other Python objects will be serialized to/from a std::string using the cloudpickle library.

Warning A restriction imposed by the use of cloudpickle is that all fragments in a distributed application must be running the same Python version.

Warning Distributed applications behave differently than single fragment applications when op_output.emit() is called to emit a tensor-like Python object. Specifically, for array-like objects such as a PyTorch tensor, the same Python object will not be received by any call to op_input.receive() in a downstream Python operator (even if the upstream and downstream operators are part of the same fragment). An object of type holoscan.Tensor will be received as a holoscan.Tensor . Any other array-like objects with data stored on device (GPU) will be received as a CuPy tensor. Similarly, any array-like object with data stored on the host (CPU) will be received as a NumPy array. The user must convert back to the original array-like type if needed (typically possible in a zero-copy fashion via DLPack or array interfaces).

For any additional C++ classes that need to be serialized for transmission between fragments in a distributed application, the user must create their own codec and register it with the Holoscan SDK framework. As a concrete example, suppose that we had the following simple Coordinate class that we wish to send between fragments.

Copy Copied! struct Coordinate { float x; float y; float z; };

To create a codec capable of serializing and deserializing this type, one should define a holoscan::codec class for it as shown below.

Copy Copied! #include "holoscan/core/codec_registry.hpp" #include "holoscan/core/errors.hpp" #include "holoscan/core/expected.hpp" namespace holoscan { template <> struct codec<Coordinate> { static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) { return serialize_trivial_type<Coordinate>(value, endpoint); } static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) { return deserialize_trivial_type<Coordinate>(endpoint); } }; } // namespace holoscan

In this example, the first argument to serialize is a const reference to the type to be serialized and the return value is an expected containing the number of bytes that were serialized. The deserialize method returns an expected containing the deserialized object. The Endpoint class is a base class representing the serialization endpoint (For distributed applications, the actual endpoint class used is UcxSerializationBuffer ).

The helper functions serialize_trivial_type ( deserialize_trivial_type ) can be used to serialize (deserialize) any plain-old-data (POD) type. Specifically, POD types can be serialized by just copying sizeof(Type) bytes to/from the endpoint. The read_trivial_type() and ~holoscan::Endpoint::write_trivial_type methods could be used directly instead.

Copy Copied! template <> struct codec<Coordinate> { static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) { return endpoint->write_trivial_type(&value); } static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) { Coordinate encoded; auto maybe_value = endpoint->read_trivial_type(&encoded); if (!maybe_value) { return forward_error(maybe_value); } return encoded; } };

In practice, one would not actually need to define codec<Coordinate> at all since Coordinate is a trivially serializable type and the existing codec treats any types for which there is not a template specialization as a trivially serializable type. It is, however, still necessary to register the codec type with the CodecRegistry as described below.

For non-trivial types, one will likely also need to use the read() and write() methods to implement the codec. Example use of these for the built-in codecs can be found in holoscan/core/codecs.hpp .

Once such a codec has been defined, the remaining step is to register it with the static CodecRegistry class. This will make the UCX-based classes used by distributed applications aware of the existence of a codec for serialization of this object type. If the type is specific to a particular operator, then one can register it via the register_codec() class.

Copy Copied! #include "holoscan/core/codec_registry.hpp" namespace holoscan::ops { void MyCoordinateOperator::initialize() { register_codec<Coordinate>("Coordinate"); // ... // parent class initialize() call must be after the argument additions above Operator::initialize(); } } // namespace holoscan::ops

Here, the argument provided to register_codec is the name the registry will use for the codec. This name will be serialized in the message header so that the deserializer knows which deserialization function to use on the received data. In this example, we chose a name that matches the class name, but that is not a requirement. If the name matches one that is already present in the CodecRegistry class, then any existing codec under that name will be replaced by the newly registered one.

It is also possible to directly register the type outside of the context of initialize() by directly retrieving the static instance of the codec registry as follows.

Copy Copied! namespace holoscan { CodecRegistry::get_instance().add_codec<Coordinate>("Coordinate"); } // namespace holoscan

Tip CLI arguments (such as --driver , --worker , --fragments ) are parsed by the Application ( C++ / Python ) class and the remaining arguments are available as app.argv ( C++ / Python ). C++

Python A concrete example of using app->argv() in the ping_distributed.cpp example is covered in the section on user-defined command line arguments. If you want to get access to the arguments before creating the C++ instance, you can access them through holoscan::Application().argv() . The following example shows how to access the arguments in your application. Copy Copied! #include <holoscan/holoscan.hpp> class MyPingApp : public holoscan::Application { // ... }; int main(int argc, char** argv) { auto my_argv = holoscan::Application({"myapp", "--driver", "my_arg1", "--address=10.0.0.1"}).argv(); HOLOSCAN_LOG_INFO(" my_argv: {}", fmt::join(my_argv, " ")); HOLOSCAN_LOG_INFO( " argv: {} (argc: {}) ", fmt::join(std::vector<std::string>(argv, argv + argc), " "), argc); auto app_argv = holoscan::Application().argv(); // do not use reference ('auto&') here (lifetime issue) HOLOSCAN_LOG_INFO("app_argv: {} (size: {})", fmt::join(app_argv, " "), app_argv.size()); auto app = holoscan::make_application<MyPingApp>(); HOLOSCAN_LOG_INFO("app->argv() == app_argv: {}", app->argv() == app_argv); app->run(); return 0; } // $ ./myapp --driver --input image.dat --address 10.0.0.20 // my_argv: myapp my_arg1 // argv: ./myapp --driver --input image.dat --address 10.0.0.20 (argc: 6) // app_argv: ./myapp --input image.dat (size: 3) // app->argv() == app_argv: true Please see other examples in the Application unit tests in the Holoscan SDK repository. A concrete example of usage of app.argv in the ping_distributed.py example is covered in the section on user-defined command line arguments. If you want to get access to the arguments before creating the Python instance, you can access them through Application().argv . The following example shows how to access the arguments in your application. Copy Copied! import argparse import sys from holoscan.core import Application class MyApp(Application): def compose(self): pass def main(): app = MyApp() # or alternatively, MyApp([sys.executable, *sys.argv]) app.run() if __name__ == "__main__": print("sys.argv:", sys.argv) print("Application().argv:", app.argv) parser = argparse.ArgumentParser() parser.add_argument("--input") args = parser.parse_args(app.argv[1:]) print("args:", args) main() # $ python cli_test.py --address 10.0.0.20 --input image.dat # sys.argv: ['cli_test.py', '--address', '10.0.0.20', '--input', 'image.dat'] # Application().argv: ['cli_test.py', '--input', 'image.dat'] # args: Namespace(input='a') Copy Copied! >>> from holoscan.core import Application >>> import sys >>> Application().argv == sys.argv True >>> Application([]).argv == sys.argv True >>> Application([sys.executable, *sys.argv]).argv == sys.argv True >>> Application(["python3", "myapp.py", "--driver", "my_arg1", "--address=10.0.0.1"]).argv ['myapp.py', 'my_arg1'] Please see other examples in the Application unit tests (TestApplication class) in the Holoscan SDK repository.

When adding user-defined command line arguments to an application, one should avoid the use of any of the default command line argument names as --help , --version , --config , --driver , --worker , --address , --worker-address , --fragments as covered in the section on running a distributed application. It is recommended to parse user-defined arguments from the argv (( C++ / Python )) method/property of the application as covered in the note above, instead of using C++ char* argv[] or Python sys.argv directly. This way, only the new, user-defined arguments will need to be parsed.

A concrete example of this for both C++ and Python can be seen in the existing ping_distributed example where an application-defined boolean argument ( --gpu ) is specified in addition to the default set of application arguments.