Creating a Distributed Application

Distributed applications refer to those where the workflow is divided into multiple fragments that may be run on separate nodes. For example, data might be collected via a sensor at the edge, sent to a separate workstation for processing, and then the processed data could be sent back to the edge node for visualization. Each node would run a single fragment consisting of a computation graph built up of operators. Thus one fragment is the equivalent of a non-distributed application. In the distributed context, the Application initializes the different fragments and then defines the connections between them to build up the full distributed application workflow.

In this section we’ll describe:

Tip

Defining distributed applications is also illustrated in the video_replayer_distributed and ping_distributed examples.

Defining a single Fragment (C++/Python) involves adding operators using make_operator() (C++) or the operator constructor (Python), and defining the connections between them using the add_flow() method (C++/Python) in the compose() method. Thus, defining a Fragment is just like defining a non-distributed Application except that the class should inherit from Fragment instead of Application.

The application will then be defined by initializing fragments within the application’s compose() method. The add_flow() method (C++/Python) can be used to define the connections across fragments.

  • We define the Fragment1 and Fragment2 classes that inherit from the Fragment base class.

  • We define the App class that inherits from the Application base class.

  • The App class initializes any fragments used and defines the connections between them. Here we have used dummy port and operator names in the example add_flow call connecting the fragments since no specific operators are shown in this example.

  • We create an instance of the App class in main() using the make_application() function.

  • The run() method starts the application which will execute its compose() method where the custom workflow will be defined.

Copy
Copied!
            

#include <holoscan/holoscan.hpp> class Fragment1 : public holoscan::Fragment { public: void compose() override { // Define Operators and workflow for Fragment1 // ... } }; class Fragment2 : public holoscan::Fragment { public: void compose() override { // Define Operators and workflow for Fragment2 // ... } }; class App : public holoscan::Application { public: void compose() override { using namespace holoscan; auto fragment1 = make_fragment<Fragment1>("fragment1"); auto fragment2 = make_fragment<Fragment2>("fragment2"); // Define the workflow: replayer -> holoviz add_flow(fragment1, fragment2, {{"fragment1_operator_name.output_port_name", "fragment2_operator_name.input_port_name"}}); } }; int main() { auto app = holoscan::make_application<App>(); app->run(); return 0; }

  • We define the Fragment1 and Fragment2 classes that inherit from the Fragment base class.

  • We define the App class that inherits from the Application base class.

  • The App class initializes any fragments used and defines the connections between them. Here we have used dummy port and operator names in the example add_flow call connecting the fragments since no specific operators are shown in this example.

  • We create an instance of the App class in __main__.

  • The run() method starts the application which will execute its compose() method where the custom workflow will be defined.

Copy
Copied!
            

from holoscan.core import Application, Fragment class Fragment1(Fragment): def compose(self): # Define Operators and workflow # ... class Fragment2(Fragment): def compose(self): # Define Operators and workflow # ... class App(Application): def compose(self): fragment1 = Fragment1(self, name="fragment1") fragment2 = Fragment2(self, name="fragment2") self.add_flow(fragment1, fragment2, {("fragment1_operator_name.output_port_name", "fragment2_operator_name.input_port_name")}) if __name__ == "__main__": app = App() app.run()

Serialization of Custom Data Types for Distributed Applications

Transmission of data between fragments of a multi-fragment application is done via the Unified Communications X (UCX) library. In order to transmit data, it must be serialized into a binary form suitable for transmission over a network. For Tensors ({ref}C++/Python), strings and various scalar and vector numeric types, serialization is already built in. For more details on concrete examples of how to extend the data serialization support to additional user-defined classes, see the separate page on serialization.

Building a distributed application works in the same way as for a non-distributed one. See Building and running your Application

Python applications do not require building. See Building and running your Application.

Running an application in a distributed setting requires launching the application binary on all nodes involved in the distributed application. A single node must be selected to act as the application driver. This is achieved by using the --driver command-line option. Worker nodes are initiated by launching the application with the --worker command-line option. It’s possible for the driver node to also serve as a worker if both options are specified.

The address of the driver node must be specified for each process (both the driver and worker(s)) to identify the appropriate network interface for communication. This can be done via the --address command-line option, which takes a value in the form of [<IPv4 address or hostname>][:<port>] (e.g., --address 192.168.50.68:10000):

  • The driver’s IP (or hostname) MUST be set for each process (driver and worker(s)) when running distributed applications on multiple nodes (default: 0.0.0.0). It can be set without the port (e.g., --address 192.168.50.68).

  • In a single-node application, the driver’s IP (or hostname) can be omitted, allowing any network interface (0.0.0.0) to be selected by the UCX library.

  • The port is always optional (default: 8765). It can be set without the IP (e.g., --address :10000).

The worker node’s address can be defined using the --worker-address command-line option ([<IPv4 address or hostname>][:<port>]). If it’s not specified, the application worker will default to the host address (0.0.0.0) and select an available port randomly from the range 10000 to 32767.

The --fragments command-line option is used in combination with --worker to specify a comma-separated list of fragment names to be run by a worker. If not specified, the application driver will assign a single fragment to the worker. To indicate that a worker should run all fragments, you can specify --fragments all.

The --config command-line option can be used to designate a path to a configuration file to be used by the application.

Below is an example launching a three fragment application named my_app on two separate nodes:

  • The application driver is launched at 192.168.50.68:10000 on the first node (A), with a worker running two fragments, “fragment1” and “fragment3”.

  • On a separate node (B), the application launches a worker for “fragment2”, which will connect to the driver at the address above.

Copy
Copied!
            

# Node A my_app --driver --worker --address 192.168.50.68:10000 --fragments fragment1,fragment3 # Node B my_app --worker --address 192.168.50.68:10000 --fragments fragment2

Copy
Copied!
            

# Node A python3 my_app.py --driver --worker --address 192.168.50.68:10000 --fragments fragment1,fragment3 # Node B python3 my_app.py --worker --address 192.168.50.68:10000 --fragments fragment2

Note

UCX Network Interface Selection

UCX is used in the Holoscan SDK for communication across fragments in distributed applications. It is designed to select the best network device based on performance characteristics (bandwidth, latency, NUMA locality, etc). In some scenarios (under investigation) UCX cannot find the correct network interface to use, and the application fails to run. In this case, you can manually specify the network interface to use by setting the UCX_NET_DEVICES environment variable.

For example, if the user wants to use the network interface eth0, you can set the environment variable as follows, before running the application:

Copy
Copied!
            

export UCX_NET_DEVICES=eth0

Or, if you are running a packaged distributed application with the Holoscan CLI, use the --nic eth0 option to manually specify the network interface to use.

The available network interface names can be found by running the following command:

Copy
Copied!
            

ucx_info -d | grep Device: | awk '{print $3}' | sort | uniq # or ip -o -4 addr show | awk '{print $2, $4}' # to show interface name and IP

Warning

Known limitations

The following are known limitations of the distributed application support in the SDK, which will be addressed in future updates:

1. The driver calls the compose() method of the fragments.

Although the driver doesn’t execute fragments, it still invokes the compose() method of the fragments to determine the number of connections between them.

2. A connection error message is displayed even when the distributed application is running correctly.

The message Connection dropped with status -25 (Connection reset by remote peer) appears in the console even when the application is functioning properly. This is a known issue and will be addressed in future updates, ensuring that this message will only be displayed in the event of an actual connection error.

3. An operator in one fragment cannot have output port(s) connected to the multiple input ports of the operator(s) in another fragment.

The distributed application will not function if there are multiple input/output port connections between two operators in different fragments, as illustrated in the figure below.

distributed_app_workflows.png

Fig. 15 Illustration of a non-working scenario: Multiple input/output ports are connected between two operators in different fragments.

The distributed application test cases shows examples of working and non-working scenarios. (ForwardedTwoMultiInputsOutputsFragmentsApp shows the workaround for this limitation.)

4. GPU tensors can only currently be sent/received by UCX from device 0.

Because the device ID associated with the network context is currently hardcoded in the executor code, GPU tensors can only be sent/received between fragments from device 0.

Tip

Given a CMake project, a pre-built executable, or a python application, you can also use the Holoscan CLI to package and run your Holoscan application in a OCI-compliant container image.

Environment Variables for Distributed Applications

Holoscan SDK environment variables.

Environment variables can be set to override the default behavior of the scheduler used when running a distributed application.

  • HOLOSCAN_DISTRIBUTED_APP_SCHEDULER : controls which scheduler is used for distributed applications. It can be set to either greedy or multithread. If unspecified, the default scheduler is greedy.

  • HOLOSCAN_STOP_ON_DEADLOCK : can be used in combination with HOLOSCAN_DISTRIBUTED_APP_SCHEDULER to control whether or not the application will automatically stop on deadlock. Values of “True”, “1” or “ON” will be interpreted as true (enable stop on deadlock). It is true if unspecified. This environment variable is only used when HOLOSCAN_DISTRIBUTED_APP_SCHEDULER is explicitly set.

  • HOLOSCAN_STOP_ON_DEADLOCK_TIMEOUT : controls the delay (in ms) without activity required before an application is considered to be in deadlock. It must be an integer value (units are ms).

  • HOLOSCAN_MAX_DURATION_MS : sets the application to automatically terminate after the requested maximum duration (in ms) has elapsed. It must be an integer value (units are ms). This environment variable is only used when HOLOSCAN_DISTRIBUTED_APP_SCHEDULER is explicitly set.

  • HOLOSCAN_CHECK_RECESSION_PERIOD_MS : controls how long (in ms) the scheduler waits before re-checking the status of operators in an application. It must be a floating point value (units are ms). This environment variable is only used when HOLOSCAN_DISTRIBUTED_APP_SCHEDULER is explicitly set.

  • HOLOSCAN_UCX_SERIALIZATION_BUFFER_SIZE : can be used to override the default 7 kB serialization buffer size. This should typically not be needed as tensor types store only a small header in this buffer to avoid explicitly making a copy of their data. However, other data types do get directly copied to the serialization buffer and in some cases it may be necessary to increase it.

UCX-specific environment variables

Transmission of data between fragments of a multi-fragment application is done via the Unified Communications X (UCX) library, a point-to-point communication framework designed to utilize the best available hardware resources (shared memory, TCP, GPUDirect RDMA, etc). UCX has many parameters that can be controlled via environment variables. A few that are particularly relevant to Holoscan SDK distributed applications are listed below:

  • The UCX_TLS environment variable can be used to control which transport layers are enabled. By default, UCX_TLS=all and UCX will attempt to choose the optimal transport layer automatically.

  • The UCX_NET_DEVICES environment variable is by default set to all meaning that UCX may choose to use any available network interface controller (NIC). In some cases it may be necessary to restrict UCX to a specific device or set of devices, which can be done by setting UCX_NET_DEVICES to a comma separated list of the device names (i.e. as obtained by linux command ifconfig -a or ip link show).

  • Setting UCX_TCP_CM_REUSEADDR=y is recommended to enable ports to be reused without having to wait the full socket TIME_WAIT period after a socket is closed.

  • The UCX_LOG_LEVEL environment variable can be used to control the logging level of UCX. The default is setting is WARN, but changing to a lower level such as INFO will provide more verbose output on which transports and devices are being used.

  • By default, Holoscan SDK will automatically set UCX_PROTO_ENABLE=y upon application launch to enable the newer “v2” UCX protocols. If for some reason, the older v1 protocols are needed, one can set UCX_PROTO_ENABLE=n in the environment to override this setting. When the v2 protocols are enabled, one can optionally set UCX_PROTO_INFO=y to enable detailed logging of what protocols are being used at runtime.

Tip

A list of all available UCX environment variables and a brief description of each can be obtained by running ucx_info -f from the Holoscan SDK container. Holoscan SDK uses UCX’s active message (AM) protocols, so environment variables related to other protocols such as tag-mat

Distributed applications must serialize any objects that are to be sent between the fragments of a multi-fragment application. Serialization involves binary serialization to a buffer that will be sent from one fragment to another via the Unified Communications X (UCX) library. For tensor types (e.g. holoscan::Tensor), no actual copy is made, but instead transmission is done directly from the original tensor’s data and only a small amount of header information is copied to the serialization buffer.

A table of the types that have codecs pre-registered so that they can be serialized between fragments using Holoscan SDK is given below.

Type Class

Specific Types

integers int8_t, int16_t, int32_t, int64_t, uint8_t, uint16_t, uint32_t, uint64_t
floating point float, double, complex , complex
boolean bool
strings std::string
std::vector T is std::string or any of the boolean, integer or floating point types above
std::vector> T is std::string or any of the boolean, integer or floating point types above
std::vector a vector of InputSpec objects that are specific to HolovizOp
std::shared_ptr<%> T is any of the scalar, vector or std::string types above
tensor types holoscan::Tensor, nvidia::gxf::Tensor, nvidia::gxf::VideoBuffer, nvidia::gxf::AudioBuffer
GXF-specific types nvidia::gxf::TimeStamp, nvidia::gxf::EndOfStream

Python

For the Python API, any array-like object supporting the DLPack interface, __array_interface__ or __cuda_array_interface__ will be transmitted using Tensor serialization. This is done to avoid data copies for performance reasons. Objects of type list[holoscan.HolovizOp.InputSpec] will be sent using the underlying C++ serializer for std::vector<HolovizOp::InputSpec>. All other Python objects will be serialized to/from a std::string using the cloudpickle library. One restriction imposed by the use of cloudpickle is that all fragments in a distributed application must be running the same Python version.

C++

For any additional C++ classes that need to be serialized for transmission between fragments in a distributed application, the user must create their own codec and register it with the Holoscan SDK framework. As a concrete example, suppose that we had the following simple Coordinate class that we wish to send between fragments.

Copy
Copied!
            

struct Coordinate { float x; float y; float z; };

To create a codec capable of serializing and deserializing this type one should define a holoscan::codec class for it as shown below.

Copy
Copied!
            

#include "holoscan/core/codec_registry.hpp" #include "holoscan/core/errors.hpp" #include "holoscan/core/expected.hpp" namespace holoscan { template <> struct codec<Coordinate> { static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) { return serialize_trivial_type<Coordinate>(value, endpoint); } static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) { return deserialize_trivial_type<Coordinate>(endpoint); } }; } // namespace holoscan

where the first argument to serialize is a const reference to the type to be serialized and the return value is an expected containing the number of bytes that were serialized. The deserialize method returns an expected containing the deserialized object. The Endpoint class is a base class representing the serialization endpoint (For distributed applications, the actual endpoint class used is UcxSerializationBuffer).

The helper functions serialize_trivial_type (deserialize_trivial_type) can be used to serialize (deserialize) any plain-old-data (POD) type. Specifically, POD types can be serialized by just copying sizeof(Type) bytes to/from the endpoint. The read_trivial_type() and ~holoscan::Endpoint::write_trivial_type methods could be used directly instead.

Copy
Copied!
            

template <> struct codec<Coordinate> { static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint* endpoint) { return endpoint->write_trivial_type(&value); } static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) { Coordinate encoded; auto maybe_value = endpoint->read_trivial_type(&encoded); if (!maybe_value) { return forward_error(maybe_value); } return encoded; } };

In practice, one would not actually need to define codec<Coordinate> at all since Coordinate is a trivially serializable type and the existing codec treats any types for which there is not a template specialization as a trivially serializable type. It is, however, still necessary to register the codec type with the CodecRegistry as described below.

For non-trivial types, one will likely also need to use the read() and write() methods to implement the codec. Example use of these for the built-in codecs can be found in holoscan/core/codecs.hpp.

Once such a codec has been defined, the remaining step is to register it with the static CodecRegistry class. This will make the UCX-based classes used by distributed applications aware of the existence of a codec for serialization of this object type. If the type is specific to a particular operator, then one can register it via the register_codec() class.

Copy
Copied!
            

#include "holoscan/core/codec_registry.hpp" namespace holoscan::ops { void MyCoordinateOperator::initialize() { register_codec<Coordinate>("Coordinate"); // ... // parent class initialize() call must be after the argument additions above Operator::initialize(); } } // namespace holoscan::ops

Here, the argument provided to register_codec is the name the registry will use for the codec. This name will be serialized in the message header so that the deserializer knows which deserialization function to use on the received data. In this example, we chose a name that matches the class name, but that is not a requirement. If the name matches one that is already present in the CodecRegistry class, then any existing codec under that name will be replaced by the newly registered one.

It is also possible to directly register the type outside of the context of initialize() by directly retrieving the static instance of the codec registry as follows.

Copy
Copied!
            

namespace holoscan { CodecRegistry::get_instance().add_codec<Coordinate>("Coordinate"); } // namespace holoscan

Previous Creating an Application
Next Packaging Holoscan Applications
© Copyright 2022-2023, NVIDIA. Last updated on Feb 9, 2024.