This document provides a DOCA Allreduce collective operation implementation on top of NVIDIA® BlueField® DPU using UCX.
Allreduce is a collective operation which allows collecting data from different processing units to combine them into a global result by a chosen operator. In turn, the result is distributed back to all processing units.
Allreduce operates in stages. Firstly, each participant scatters its vector. Secondly, each participant gathers the vectors of the other participants. Lastly, each participant performs their chosen operation between all the gathered vectors. Using a sequence of different allreduce operations with different participants, very complex computations can be spread among many computation units.
Allreduce is widely used by parallel applications in high-performance computing (HPC) related to scientific simulations and data analysis, including machine learning calculation and the training phase of neural networks in deep learning.
Due to the massive growth of deep learning models and the complexity of scientific simulation tasks that utilize a network, effective implementation of allreduce is essential for minimizing communication time.
This document describes how to implement allreduce using the UCX communication framework, which leverages NVIDIA® BlueField® DPU by providing low-latency and high-bandwidth utilization of its network engine.
This document describes the following types of allreduce:
- Offloaded client – processes running on the host which only submit allreduce operation requests to a daemon running on the DPU. The daemon runs on the DPU and performs the allreduce algorithm on behalf of its on-host-clients (offloaded-client).
- Non-offloaded client – processes running on the host which execute the allreduce algorithm by themselves
The application is designed to measure three metrics:
- Communication time taken by offloaded and non-offloaded allreduce operations
- Computation time taken by matrix multiplications which are done by clients until the allreduce operation is completed
- The overlap of the two previous metrics. The percentage of the total runtime during which both the allreduce and the matrix multiplications were done in parallel.
The allreduce implementation is divided into two different types of processes: clients and daemons. Clients are responsible for allocating vectors filled with data and initiating allreduce operations by sending a request with a vector to their daemon. Daemons are responsible for gathering vectors from all connected clients and daemons, applying a chosen operator on all received buffers, and then scattering the reduced result vector back to the clients.
- Offloaded mode
- Non-offloaded mode
DOCA's allreduce implementation uses Unified Communication X (UCX) to support data exchange between endpoints. It utilizes UCX's sockaddr-based connection establishment and the UCX Active Messages (AM) API for communications.
- Offloaded mode
- Non-offloaded mode
- Connections between processes are established by UCX using IP addresses and ports of peers.
- Allreduce vectors are sent from clients to daemons in offloaded mode, or from clients to clients in non-offloaded mode.
- Reduce operations on vectors are done using received vectors from other daemons in offloaded mode, or other clients in non-offloaded mode.
- Vectors with allreduce results are received by clients from daemons in offloaded mode, or are already stored in clients after completing all exchanges in non-offloaded mode.
- After completing all allreduce operations, connections between clients are destroyed.
- Parse application argument.
doca_argp_init();
- Initialize arg parser resources and register DOCA general parameters.
register_allreduce_params();
- Register UCX allreduce application parameters.
doca_argp_start();
- Parse all registered parameters.
- Initialize arg parser resources and register DOCA general parameters.
- UCX initialization.
allreduce_ucx_init();
- Initialize hash table of connections.
ucp_init();
- Create UCP context.
ucp_worker_create();
- Create UCP worker.
ucp_worker_set_am_recv_handler();
- Set AM handler for receiving connection check packets.
- Initialize hash table of connections.
- Initialization of the allreduce connectivity.
communication_init();
- Initialize hash table of allreduce super requests.
- Set "receive callback" for handshake messages.
- If daemon or non-offloaded client:
allreduce_ucx_am_set_recv_handler();
- Set AM handler for receiving allreduce requests from clients.
allreduce_ucx_listen();
- Initialize UCX listening function. This creates a UCP listener.
connections_init();
- Set AM handler for receiving allreduce requests from clients.
- Initialize all connections.
- Go over all destination addresses.
- Connect to each peer. Repeat until a successful send occurs (to check connectivity).
ucp_am_send_nbx(); allreduce_ucx_request_wait();
- Insert the connection to the hash table of connections.
allreduce_outgoing_handshake();
- Scatter handshake message to peers/daemon to make sure they all have the same
-s,
-i,
-b, and
-dflags.
- Daemon: Start UCX progress.
daemon_run(); allreduce_ucx_am_set_recv_handler();
- Set AM handler to receive allreduce requests from clients.
while (running) allreduce_ucx_progress();
- Perform UCP worker progress.
- Invoke callbacks for incoming messages by calling
allreduce_ucx_progress.
- Set AM handler to receive allreduce requests from clients.
- Client.
client_run(); allreduce_vectors_init();
- Allocate buffers to store allreduce initial data and results.
allreduce_ucx_am_set_recv_handler();
- Set an AM handler for receiving allreduce results.
allreduce_barrier();
- Perform allreduce barrier. Check that all daemons and clients are active.
- Submit a batch of allreduce operations with 0 byte.
- Wait for completions.
allreduce_metrics_init();
- Reset metrics and vectors.
- Submit some batches and calculate estimated network time.
- Allocate matrices to multiply.
- Estimate how many matrix multiplications could have been performed instead of networking (same time windows).
- Calculate the actual computation time of these matrix multiplications.
Do num-batches (flag) times: allreduce_vectors_reset(); allreduce_batch_submit(); cpu_exploit(); allreduce_batch_wait(); allreduce_metrics_calculate();
- Reset vectors.
- Submit a batch of allreduce operations to daemon/peer (depends on mode).
- Perform matrix multiplications during a time period which is approximately equal to doing a single batch of allreduce operations and calculate the actual time cost.
- Wait for the allreduce operation to complete and calculate time cost.
- Update metrics.
allreduce_metrics_print();
- Print summary of allreduce benchmarking.
- Allocate buffers to store allreduce initial data and results.
- Arg parser destroy.
doca_argp_destroy();
- Communication destroy.
- Clean up connections.
allreduce_ucx_disconnect();
- Remove the connection from the hash table of the connections.
ucp_ep_close_nbx();
- Close inner UCP endpoint.
- Wait for the completion of the UCP endpoint closure.
- Destroy connection.
- Free connections array.
- Remove the connection from the hash table of the connections.
- Destroy the hash table of the allreduce super requests.
- Clean up connections.
- Destroy UCX context.
g_hash_table_destroy();
- Destroy the hash table of the connections.
ucp_listener_destroy();
- If the UCP listener was created, destroy it.
ucp_worker_destroy();
- Destroy UCP worker.
ucp_cleanup();
- Destroy UCP context.
- Destroy the hash table of the connections.
- Refer to the following documents:
- NVIDIA DOCA Installation Guide for details on how to install BlueField-related software.
- NVIDIA DOCA Troubleshooting Guide for any issue you may encounter with the installation, compilation, or execution of DOCA applications.
- The allreduce binary is located under /opt/mellanox/doca/applications/allreduce/bin/doca_allreduce. To build all the applications together, Run:
cd /opt/mellanox/doca/applications/ meson build ninja -C build
- To build the allreduce sample only:
- Edit the following flags in
/opt/mellanox/doca/applications/meson_option.txt:
- Set
enable_all_applicationsto
false
- Set
enable_allreduceto
true
- Set
- Run the commands in step 2.
Note:
The
doca_allreduceapplication is created under
./build/allreduce/src/.
Application usage:
Usage: doca_allreduce [DOCA Flags] [Program Flags] DOCA Flags: -h, --help Print a help synopsis -v, --version Print program version information -l, --log-level Set the log level for the program <CRITICAL=0, DEBUG=4> Program Flags: -r, --role Run DOCA UCX allreduce process as: "client" or "daemon" -m, --mode <allreduce_mode> Set allreduce mode: "offloaded", "non-offloaded" (valid for client only) -p, --port <port> Set default destination port of daemons/clients, used for IPs without a port (see '-a' flag) -c, --num-clients <num_clients> Set the number of clients which participate in allreduce operations (valid for daemon only) -s, --size <size> Set size of vector to do allreduce for -d, --datatype <datatype> Set datatype ("byte", "int", "float", "double") of vector elements to do allreduce for -o, --operation <operation> Set operation ("sum", "prod") to do between allreduce vectors -b, --batch-size <batch_size> Set the number of allreduce operations submitted simultaneously (used for handshakes by daemons) -i, --num-batches <num_batches> Set the number of batches of allreduce operations (used for handshakes by daemons) -t, --listen-port <listen_port> Set listening port of daemon or client -a, --address <ip address> Set comma-separated list of destination IPv4/IPv6 addresses and ports optionally (<ip_addr>:[<port>]) of daemons or clients
- Edit the following flags in
- Running the application on BlueField:
- All daemons should be deployed before clients. Only after connecting to their peers are they able to handle clients.
- Pre-run setup:
UCX probes the system for any available net/IB devices and, by default, tries to create a multi-device connection. This means that if some network devices are available but provide an unreachable path from the daemon to the peer/client, UCX may still use that path. A common case is that a daemon tries to connect to a different BlueField using
tmfifo_net0which is connected to the host only. To fix this issue, follow these steps:
- Use the UCX env variable
UCX_NET_DEVICESto set usable devices. For example:
export UCX_NET_DEVICES=enp3s0f0s0,enp3s0f1s0 /opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r daemon -t 34001 -c 1 -s 100 -o sum -d float
Or:
env UCX_NET_DEVICES=enp3s0f0s0,enp3s0f1s0 /opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r daemon -t 34001 -c 1 -s 100 -o sum -d float
-
Get the mlx device name and port of a SF to limit the UCX network interfaces and allow IB. For example:
BlueField> show_gids DEV PORT INDEX GID IPv4 VER DEV --- ---- ----- --- ------------ --- --- mlx5_2 1 0 fe80:0000:0000:0000:0052:72ff:fe63:1651 v2 enp3s0f0s0 mlx5_3 1 0 fe80:0000:0000:0000:0032:6bff:fe13:f13a v2 enp3s0f1s0 BlueField> UCX_NET_DEVICES=enp3s0f0s0,enp3s0f1s0,mlx5_2:1,mlx5_3:1 /opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r daemon -t 34001 -c 1 -s 100 -o sum -d float
- Use the UCX env variable
- CLI example for running a client:
/opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r client -m offloaded -t 34001 -a 10.21.211.3:35001 -s 65535 -i 16 -b 128 -o sum -d float -i 16 -b 128Note:
The flags
-s,
-i,
-b, and
-dmust be the same for all clients and daemons participating in the allreduce operation.
- CLI example for running a daemon:
/opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r daemon -t 34001 -c 2 -a 10.21.211.3:35001,10.21.211.4:36001 -s 65535 -o sum -d floatNote:
The flag
-ais necessary for communicating with other daemons. In case of an offloaded client, the address must be that of the daemon which performs the allreduce operations for them. In case of a daemon or non-offloaded clients, the flag could be a single or multiple addresses of other daemons/non-offloaded clients which exchange their local allreduce results.Note:
The flag
-cmust be specified for daemon processes only. It indicates how many clients submit their allreduce operations to the daemon.Note:
The daemon listens to incoming connection requests on all available IPs, but the actual communication after the initial "UCX handshake" does not necessarily use the same device used for the connection establishment.
- Running the application on the host, CLI example:
/opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r client -m non-offloaded -t 34001 -a 10.21.211.3:35001,10.21.211.4:36001 -s 65535 -i 16 -b 128 -o sum -d float /opt/mellanox/doca/applications/allreduce/bin/doca_allreduce -r client -m offloaded -p 34001 -a 192.168.100.2 -s 65535 -i 16 -b 128 -o sum -d floatNote:
Refer to section "Running DOCA Application on Host" in NVIDIA DOCA Virtual Functions User Guide.
Refer to NVIDIA DOCA Arg Parser User Guide for more information.
|Flag Type
|Short Flag
|Long Flag/JSON Key
|Description
|General Flags
|l
|log-level
|Set the log level for the application:
|v
|version
|Print program version information
|h
|help
|Print a help synopsis
|Program Flags
|r
|role
|Run DOCA UCX allreduce process as either
client or
daemon
|m
|mode
|Set allreduce mode. Available types options:
|p
|port
|Set default destination port of daemons/clients. Used for IPs without a port (see
-a flag).
|c
|num-clients
|Set the number of clients which participate in allreduce operations
Note:
Valid for daemon only.
|s
|size
|Set size of vector to perform allreduce for
|d
|datatype
|Set datatype of vector elements to do allreduce for
|o
|operation
|Set operation to perform between allreduce vectors
|b
|batch-size
|Set the number of allreduce operations submitted simultaneously. Used for handshakes by daemons.
|i
|num-batches
|Set the number of batches of allreduce operations. Used for handshakes by daemons.
|t
|listen-port
|Set listening port of daemon or client
|a
|address
|Set comma-separated list of destination IPv4/IPv6 address and ports optionally of daemons or clients. Format: <ip_addr>:[<port>].
- /opt/mellanox/doca/applications/allreduce/src/allreduce.c
- /opt/mellanox/doca/applications/allreduce/src/allreduce_client.c
- /opt/mellanox/doca/applications/allreduce/src/allreduce_client.h
- /opt/mellanox/doca/applications/allreduce/src/allreduce_core.c
- /opt/mellanox/doca/applications/allreduce/src/allreduce_core.h
- /opt/mellanox/doca/applications/allreduce/src/allreduce_daemon.c
- /opt/mellanox/doca/applications/allreduce/src/allreduce_daemon.h
- /opt/mellanox/doca/applications/allreduce/src/allreduce_ucx.c
- /opt/mellanox/doca/applications/allreduce/src/allreduce_ucx.h
- /opt/mellanox/doca/applications/allreduce/bin/allreduce_client_params.json
- /opt/mellanox/doca/applications/allreduce/bin/allreduce_daemon_params.json
