Examples¶
Source code for the examples described in this section is available in the examples folder of the NVSHMEM package.
Attribute-Based Initialization Example¶
The following code shows an MPI version of the simple shift program that was explained in The NVSHMEM Programming Model. It shows the use of the NVSHMEM attribute-based initialization API where the MPI communicator can be used to set up NVSHMEM.
#include <stdio.h>
#include "mpi.h"
#include "nvshmem.h"
#include "nvshmemx.h"
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] CUDA failed with %s \n", \
__FILE__, __LINE__, cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void simple_shift(int *destination) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
nvshmem_int_p(destination, mype, peer);
}
int main (int argc, char *argv[]) {
int mype_node, msg;
cudaStream_t stream;
int rank, nranks;
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr = NVSHMEMX_INIT_ATTR_INITIALIZER;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
int *destination = (int *) nvshmem_malloc (sizeof(int));
simple_shift<<<1, 1, 0, stream>>>(destination);
nvshmemx_barrier_all_on_stream(stream);
CUDA_CHECK(cudaMemcpyAsync(&msg, destination, sizeof(int),
cudaMemcpyDeviceToHost, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("%d: received message %d\n", nvshmem_my_pe(), msg);
nvshmem_free(destination);
nvshmem_finalize();
MPI_Finalize();
return 0;
}
The following code shows a Unique ID version of the simple shift program that was explained in The NVSHMEM Programming Model. It shows the use of the NVSHMEM attribute-based initializion API where the Unique ID arguments can be used to set up NVSHMEM.
#include <stdio.h>
#include "mpi.h"
#include "nvshmem.h"
#include "nvshmemx.h"
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] CUDA failed with %s \n", \
__FILE__, __LINE__, cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void simple_shift(int *destination) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
nvshmem_int_p(destination, mype, peer);
}
int main (int argc, char *argv[]) {
int mype_node, msg;
cudaStream_t stream;
int rank, nranks;
nvshmemx_init_attr_t attr = NVSHMEMX_INIT_ATTR_INITIALIZER;
nvshmemx_uniqueid_t id = NVSHMEMX_UNIQUEID_INITIALIZER;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
// PE 0 queries the unique ID
if (rank == 0) {
nvshmemx_get_uniqueid(&id);
}
// PE 0 broadcast the unique ID to all peers
MPI_Bcast(&id, sizeof(nvshmemx_uniqueid_t), MPI_UINT8_T, 0, MPI_COMM_WORLD);
nvshmemx_set_attr_uniqueid_args(rank, nranks, &id, &attr);
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_UNIQUEID, &attr);
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
int *destination = (int *) nvshmem_malloc (sizeof(int));
simple_shift<<<1, 1, 0, stream>>>(destination);
nvshmemx_barrier_all_on_stream(stream);
CUDA_CHECK(cudaMemcpyAsync(&msg, destination, sizeof(int),
cudaMemcpyDeviceToHost, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("%d: received message %d\n", nvshmem_my_pe(), msg);
nvshmem_free(destination);
nvshmem_finalize();
MPI_Finalize();
return 0;
}
Collective Launch Example¶
The following code shows an example implementation of a single ring-based reduction where multiple iterations of the code, including computation, communication and synchronization are expressed as a single kernel.
This example also demonstrates the use of NVSHMEM collective launch, required when the NVSHMEM synchronization API is used from inside the CUDA kernel.
There is no MPI dependency for the example. NVSHMEM can be used to port existing MPI applications and develop new applications.
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
#define NVSHMEM_CHECK(stmt) \
do { \
int result = (stmt); \
if (NVSHMEMX_SUCCESS != result) { \
fprintf(stderr, "[%s:%d] nvshmem failed with error %d \n", __FILE__, __LINE__, \
result); \
exit(-1); \
} \
} while (0)
__global__ void reduce_ring(int *target, int mype, int npes) {
int peer = (mype + 1) % npes;
int lvalue = mype;
for (int i = 1; i < npes; i++) {
nvshmem_int_p(target, lvalue, peer);
nvshmem_barrier_all();
lvalue = *target + mype;
nvshmem_barrier_all();
}
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
// application picks the device each PE will use
CUDA_CHECK(cudaSetDevice(mype_node));
int *u = (int *)nvshmem_calloc(1, sizeof(int));
int *h = (int *)calloc(1, sizeof(int));
void *args[] = {&u, &mype, &npes};
dim3 dimBlock(1);
dim3 dimGrid(1);
NVSHMEM_CHECK(
nvshmemx_collective_launch((const void *)reduce_ring, dimGrid, dimBlock, args, 0, 0));
CUDA_CHECK(cudaDeviceSynchronize());
cudaMemcpy(h, u, sizeof(int), cudaMemcpyDeviceToHost);
printf("results on device [%d] is %d \n",mype, h[0]);
nvshmem_free(u);
free(h);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
On-Stream Example¶
The following example shows how nvshmemx_*_on_stream
functions can be used to
enqueue a SHMEM operation onto a CUDA stream for execution in stream order.
Specifically, the example shows the following:
- How a collective SHMEM reduction operation can be made to wait on a preceding kernel in the stream.
- How a kernel can be made to wait for a communication result from a previous collective SHMEM reduction operation.
The example shows one use case for relieving CPU control over GPU compute and communication.
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#define THRESHOLD 42
#define CORRECTION 7
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void accumulate(int *input, int *partial_sum) {
int index = threadIdx.x;
if (0 == index) *partial_sum = 0;
__syncthreads();
atomicAdd(partial_sum, input[index]);
}
__global__ void correct_accumulate(int *input, int *partial_sum, int *full_sum) {
int index = threadIdx.x;
if (*full_sum > THRESHOLD) {
input[index] = input[index] - CORRECTION;
}
if (0 == index) *partial_sum = 0;
__syncthreads();
atomicAdd(partial_sum, input[index]);
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
int *input;
int *partial_sum;
int *full_sum;
int input_nelems = 512;
int to_all_nelems = 1;
cudaStream_t stream;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
input = (int *)nvshmem_malloc(sizeof(int) * input_nelems);
partial_sum = (int *)nvshmem_malloc(sizeof(int));
full_sum = (int *)nvshmem_malloc(sizeof(int));
accumulate<<<1, input_nelems, 0, stream>>>(input, partial_sum);
nvshmemx_int_sum_reduce_on_stream(NVSHMEM_TEAM_WORLD, full_sum, partial_sum, to_all_nelems, stream);
correct_accumulate<<<1, input_nelems, 0, stream>>>(input, partial_sum, full_sum);
CUDA_CHECK(cudaStreamSynchronize(stream));
printf("[%d of %d] run complete \n", mype, npes);
CUDA_CHECK(cudaStreamDestroy(stream));
nvshmem_free(input);
nvshmem_free(partial_sum);
nvshmem_free(full_sum);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
Threadgroup Example¶
The example in this section shows how nvshmemx_collect32_block
can be used to
leverage threads to accelerate a SHMEM collect operation when all threads in
the block depend on the result of a preceding communication operation. For this
instance, partial vector sums are computed across different PEs and have a
SHMEM collect operation to obtain the complete sum across PEs.
#include <stdio.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#ifdef NVSHMEM_MPI_SUPPORT
#include "mpi.h"
#endif
#define NTHREADS 512
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
__global__ void distributed_vector_sum(int *x, int *y, int *partial_sum, int *sum,
int use_threadgroup, int mype, int npes) {
int index = threadIdx.x;
int nelems = blockDim.x;
partial_sum[index] = x[index] + y[index];
if (use_threadgroup) {
/* all threads realize the entire fcollect operation */
nvshmemx_int_fcollect_block(NVSHMEM_TEAM_WORLD, sum, partial_sum, nelems);
} else {
/* thread 0 realizes the entire fcollect operation */
if (0 == index) {
nvshmem_int_fcollect(NVSHMEM_TEAM_WORLD, sum, partial_sum, nelems);
}
}
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
int *x;
int *y;
int *partial_sum;
int *sum;
int use_threadgroup = 1;
int nthreads = NTHREADS;
#ifdef NVSHMEM_MPI_SUPPORT
bool use_mpi = false;
char *value = getenv("NVSHMEMTEST_USE_MPI_LAUNCHER");
if (value) use_mpi = atoi(value);
#endif
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) {
MPI_Init(&c, &v);
int rank, nranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
MPI_Comm mpi_comm = MPI_COMM_WORLD;
nvshmemx_init_attr_t attr;
attr.mpi_comm = &mpi_comm;
nvshmemx_init_attr(NVSHMEMX_INIT_WITH_MPI_COMM, &attr);
} else
nvshmem_init();
#else
nvshmem_init();
#endif
npes = nvshmem_n_pes();
mype = nvshmem_my_pe();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
CUDA_CHECK(cudaSetDevice(mype_node));
x = (int *)nvshmem_malloc(sizeof(int) * nthreads);
y = (int *)nvshmem_malloc(sizeof(int) * nthreads);
partial_sum = (int *)nvshmem_malloc(sizeof(int) * nthreads);
sum = (int *)nvshmem_malloc(sizeof(int) * nthreads * npes);
void *args[] = {&x, &y, &partial_sum, &sum, &use_threadgroup, &mype, &npes};
dim3 dimBlock(nthreads);
dim3 dimGrid(1);
nvshmemx_collective_launch((const void *)distributed_vector_sum, dimGrid, dimBlock, args, 0, 0);
CUDA_CHECK(cudaDeviceSynchronize());
printf("[%d of %d] run complete \n", mype, npes);
nvshmem_free(x);
nvshmem_free(y);
nvshmem_free(partial_sum);
nvshmem_free(sum);
nvshmem_finalize();
#ifdef NVSHMEM_MPI_SUPPORT
if (use_mpi) MPI_Finalize();
#endif
return 0;
}
Put on Block Example¶
In the example below, every thread in block 0 calls nvshmemx_float_put_block
.
Alternatively, every thread can call nvshmem_float_p
, but nvshmem_float_p
has a disadvantage that when the destination GPU is connected via InfiniBand,
there is one RMA message for every single element, which can be detrimental to
performance.
The disadvantage with using nvshmem_float_put
in this case is that when the
destination GPU is P2P-connected, a single thread will copy the entire data to
the destination GPU. While nvshmemx_float_put_block
can leverage all the
threads in the block to copy the data in parallel to the destination GPU.
#include <stdio.h>
#include <assert.h>
#include "nvshmem.h"
#include "nvshmemx.h"
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
#define THREADS_PER_BLOCK 1024
__global__ void set_and_shift_kernel(float *send_data, float *recv_data, int num_elems, int mype,
int npes) {
int thread_idx = blockIdx.x * blockDim.x + threadIdx.x;
/* set the corresponding element of send_data */
if (thread_idx < num_elems) send_data[thread_idx] = mype;
int peer = (mype + 1) % npes;
/* Every thread in block 0 calls nvshmemx_float_put_block. Alternatively,
every thread can call shmem_float_p, but shmem_float_p has a disadvantage
that when the destination GPU is connected via IB, there will be one rma
message for every single element which can be detrimental to performance.
And the disadvantage with shmem_float_put is that when the destination GPU is p2p
connected, it cannot leverage multiple threads to copy the data to the destination
GPU. */
int block_offset = blockIdx.x * blockDim.x;
nvshmemx_float_put_block(recv_data + block_offset, send_data + block_offset,
min(blockDim.x, num_elems - block_offset),
peer); /* All threads in a block call the API
with the same arguments */
}
int main(int c, char *v[]) {
int mype, npes, mype_node;
float *send_data, *recv_data;
int num_elems = 8192;
int num_blocks;
nvshmem_init();
mype = nvshmem_my_pe();
npes = nvshmem_n_pes();
mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
// application picks the device each PE will use
CUDA_CHECK(cudaSetDevice(mype_node));
send_data = (float *)nvshmem_malloc(sizeof(float) * num_elems);
recv_data = (float *)nvshmem_malloc(sizeof(float) * num_elems);
assert(send_data != NULL && recv_data != NULL);
assert(num_elems % THREADS_PER_BLOCK == 0); /* for simplicity */
num_blocks = num_elems / THREADS_PER_BLOCK;
set_and_shift_kernel<<<num_blocks, THREADS_PER_BLOCK>>>(send_data, recv_data, num_elems, mype,
npes);
CUDA_CHECK(cudaGetLastError());
CUDA_CHECK(cudaDeviceSynchronize());
/* Do data validation */
float *host = new float[num_elems];
CUDA_CHECK(cudaMemcpy(host, recv_data, num_elems * sizeof(float), cudaMemcpyDefault));
int ref = (mype - 1 + npes) % npes;
bool success = true;
for (int i = 0; i < num_elems; ++i) {
if (host[i] != ref) {
printf("Error at %d of rank %d: %f\n", i, mype, host[i]);
success = false;
break;
}
}
if (success) {
printf("[%d of %d] run complete \n", mype, npes);
} else {
printf("[%d of %d] run failure \n", mype, npes);
}
nvshmem_free(send_data);
nvshmem_free(recv_data);
nvshmem_finalize();
return 0;
}
Ring Broadcast Example¶
In the example below, PE 0 broadcasts a message by sending it to PE 1, which
sends the message to PE 2 and so on. This example demonstrates several NVSHMEM
APIs, including the use of nvshmem_fence
to order communication and
nvshmem_signal_wait_until
and nvshmemx_signal_op
for point-to-point
synchronization.
#include <stdio.h>
#include <stdint.h>
#include <cuda.h>
#include <nvshmem.h>
#include <nvshmemx.h>
__global__ void ring_bcast(int *data, size_t nelem, int root, uint64_t *psync) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
if (mype == root)
*psync = 1;
nvshmem_signal_wait_until(psync, NVSHMEM_CMP_NE, 0);
if (mype == npes-1) return;
nvshmem_int_put(data, data, nelem, peer);
nvshmem_fence();
nvshmemx_signal_op(psync, 1, NVSHMEM_SIGNAL_SET, peer);
*psync = 0;
}
int main(void) {
size_t data_len = 32;
cudaStream_t stream;
nvshmem_init();
int mype = nvshmem_my_pe();
int mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
cudaSetDevice(mype_node);
cudaStreamCreate(&stream);
int *data = (int *) nvshmem_malloc(sizeof(int) * data_len);
int *data_h = (int *) malloc(sizeof(int) * data_len);
uint64_t *psync = (uint64_t *) nvshmem_calloc(1, sizeof(uint64_t));
for (size_t i = 0; i < data_len; i++)
data_h[i] = mype + i;
cudaMemcpyAsync(data, data_h, sizeof(int) * data_len, cudaMemcpyHostToDevice, stream);
int root = 0;
dim3 gridDim(1), blockDim(1);
void *args[] = { &data, &data_len, &root, &psync };
nvshmemx_barrier_all_on_stream(stream);
nvshmemx_collective_launch((const void *)ring_bcast, gridDim, blockDim, args, 0, stream);
nvshmemx_barrier_all_on_stream(stream);
cudaMemcpyAsync(data_h, data, sizeof(int) * data_len, cudaMemcpyDeviceToHost, stream);
cudaStreamSynchronize(stream);
for (size_t i = 0; i < data_len; i++) {
if (data_h[i] != i)
printf("PE %d error, data[%zu] = %d expected data[%zu] = %d\n",
mype, i, data_h[i], i, (int) i);
}
nvshmem_free(data);
nvshmem_free(psync);
free(data_h);
nvshmem_finalize();
return 0;
}
Ring Allreduce Example¶
In the example below, PE0 receives a message chunk from its left neighbor, performs
a local reduction and sends the resulting chunk to its right neighbor (PE1), and so on.
Eventually, every PE (but last) broadcast its own chunk to right neighbor. This
examples demonstrates several NVSHMEM APIs, including the use of nvshmem_int_put_signal_nbi
and nvshmem_signal_wait_until
for point-to-point communication & synchronization.
/*
* Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
*
* NVIDIA CORPORATION and its licensors retain all intellectual property
* and proprietary rights in and to this software, related documentation
* and any modifications thereto. Any use, reproduction, disclosure or
* distribution of this software and related documentation without an express
* license agreement from NVIDIA CORPORATION is strictly prohibited.
*
* See COPYRIGHT.txt for license information
*/
/* This example performs an allreduce operation using ring algorithm when
GPUs are connected via remote interconect like IB/RoCE/EFA, etc.
It does ring reduce followed by ring broadcast. We use single threaded put_signal API
as single thread is sufficient for remote transfers. The example is expected
to be performant only when GPUs are connected via remote interconnect. */
#include <stdio.h>
#include <stdint.h>
#include <cuda.h>
#include <nvshmem.h>
#include <nvshmemx.h>
#include <unistd.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#undef CUDA_CHECK
#define CUDA_CHECK(stmt) \
do { \
cudaError_t result = (stmt); \
if (cudaSuccess != result) { \
fprintf(stderr, "[%s:%d] cuda failed with %s \n", __FILE__, __LINE__, \
cudaGetErrorString(result)); \
exit(-1); \
} \
} while (0)
/* atol() + optional scaled suffix recognition: 1K, 2M, 3G, 1T */
static inline int atol_scaled(const char *str, size_t *out) {
int scale, n;
double p = -1.0;
char f;
n = sscanf(str, "%lf%c", &p, &f);
if (n == 2) {
switch (f) {
case 'k':
case 'K':
scale = 10;
break;
case 'm':
case 'M':
scale = 20;
break;
case 'g':
case 'G':
scale = 30;
break;
case 't':
case 'T':
scale = 40;
break;
default:
return 1;
}
} else if (p < 0) {
return 1;
} else
scale = 0;
*out = (size_t)ceil(p * (1lu << scale));
return 0;
}
size_t min_size = 1024 * 1024 * 32;
size_t max_size = min_size * 16;
size_t num_blocks = 32;
size_t threads_per_block = 512;
size_t iters = 4;
size_t warmup_iters = 1;
size_t step_factor = 2;
size_t chunk_size = 262144;
// perform Allreduce using ring
__global__ void ring_reduce(int *dst, const int *src, size_t nreduce, uint64_t *signal,
size_t chunk_size) {
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int peer = (mype + 1) % npes;
int thread_id = threadIdx.x;
int num_threads = blockDim.x;
int num_blocks = gridDim.x;
int block_idx = blockIdx.x;
size_t elems_per_block = nreduce / num_blocks;
// Change src, dst, nreduce, signal to what this block is going to process
// Each CTA will work independently
if (elems_per_block * (blockIdx.x + 1) > nreduce) return;
src = src + block_idx * elems_per_block;
dst = dst + block_idx * elems_per_block;
nreduce = elems_per_block;
signal = signal + block_idx;
size_t chunk_elems = chunk_size / sizeof(int);
size_t num_chunks = nreduce / chunk_elems;
// reduce phase
for (size_t chunk = 0; chunk < num_chunks; chunk++) {
if (mype != 0) {
if (thread_id == 0) nvshmem_signal_wait_until(signal, NVSHMEM_CMP_GE, chunk + 1);
__syncthreads();
for (size_t i = thread_id; i < chunk_elems; i += num_threads) {
dst[i] = dst[i] + src[i];
}
__syncthreads();
}
if (thread_id == 0)
nvshmem_int_put_signal_nbi(dst, (mype == 0) ? src : dst, chunk_elems, signal, 1,
NVSHMEM_SIGNAL_ADD, peer);
src = src + chunk_elems;
dst = dst + chunk_elems;
}
// Broadcast phase
dst = dst - num_chunks * chunk_elems;
if (thread_id == 0) {
for (size_t chunk = 0; chunk < num_chunks; chunk++) {
if (mype < npes - 1) { // Last pe already has the final result
nvshmem_signal_wait_until(signal, NVSHMEM_CMP_GE,
(mype == 0) ? chunk + 1 : num_chunks + chunk + 1);
}
if (mype < npes - 2)
nvshmem_int_put_signal_nbi(dst, dst, chunk_elems, signal, 1, NVSHMEM_SIGNAL_ADD,
peer);
dst = dst + chunk_elems;
}
*signal = 0; // reset for next iteration
}
}
int main(int argc, char **argv) {
int c;
while ((c = getopt(argc, argv, "b:e:f:n:w:c:t:m:")) != -1) {
switch (c) {
case 'b':
atol_scaled(optarg, &min_size);
break;
case 'e':
atol_scaled(optarg, &max_size);
break;
case 'f':
atol_scaled(optarg, &step_factor);
break;
case 'n':
atol_scaled(optarg, &iters);
break;
case 'w':
atol_scaled(optarg, &warmup_iters);
break;
case 'c':
atol_scaled(optarg, &num_blocks);
break;
case 't':
atol_scaled(optarg, &threads_per_block);
break;
case 'm':
atol_scaled(optarg, &chunk_size);
break;
case '?':
if (optopt == 'c')
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
else if (isprint(optopt))
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
else
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
return 1;
default:
abort();
}
}
size_t min_ints = min_size / sizeof(int);
assert(min_ints % num_blocks == 0);
nvshmem_init();
int mype = nvshmem_my_pe();
int npes = nvshmem_n_pes();
int mype_node = nvshmem_team_my_pe(NVSHMEMX_TEAM_NODE);
cudaStream_t stream;
cudaEvent_t start, stop;
CUDA_CHECK(cudaEventCreate(&start));
CUDA_CHECK(cudaEventCreate(&stop));
CUDA_CHECK(cudaSetDevice(mype_node));
CUDA_CHECK(cudaStreamCreate(&stream));
size_t max_ints = max_size / sizeof(int);
int *dst = (int *)nvshmem_malloc(max_size);
int *src = (int *)nvshmem_malloc(max_size);
int *data_h = (int *)malloc(max_size);
uint64_t *signal = (uint64_t *)nvshmem_calloc(num_blocks, sizeof(uint64_t));
dim3 gridDim(num_blocks), blockDim(threads_per_block);
for (size_t i = 0; i < max_ints; i++) data_h[i] = i;
CUDA_CHECK(cudaMemcpyAsync(src, data_h, max_size, cudaMemcpyHostToDevice, stream));
nvshmemx_barrier_all_on_stream(stream);
for (size_t size = min_size; size <= max_size; size *= step_factor) {
size_t num_ints = size / sizeof(int);
void *args[] = {&dst, &src, &num_ints, &signal, &chunk_size};
// do warmup
for (size_t i = 0; i < warmup_iters; i++) {
nvshmemx_collective_launch((const void *)ring_reduce, gridDim, blockDim, args, 0,
stream);
nvshmemx_barrier_all_on_stream(stream);
}
CUDA_CHECK(cudaStreamSynchronize(stream));
// main loop
CUDA_CHECK(cudaEventRecord(start, stream));
for (size_t i = 0; i < iters; i++) {
nvshmemx_collective_launch((const void *)ring_reduce, gridDim, blockDim, args, 0,
stream);
nvshmemx_barrier_all_on_stream(stream);
}
CUDA_CHECK(cudaEventRecord(stop, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
if (!mype) {
float ms;
CUDA_CHECK(cudaEventElapsedTime(&ms, start, stop));
printf("%zuB \t %fms\n", size, ms / iters);
}
// validate output
CUDA_CHECK(cudaMemcpy(data_h, dst, size, cudaMemcpyDeviceToHost));
for (size_t i = 0; i < num_ints; i++) {
if (data_h[i] != (int)i * npes)
printf("PE %d error, data[%zu] = %d expected data[%zu] = %d\n", mype, i, data_h[i],
i, (int)i * npes);
}
}
CUDA_CHECK(cudaEventDestroy(start));
CUDA_CHECK(cudaEventDestroy(stop));
nvshmem_free(dst);
nvshmem_free(src);
nvshmem_free(signal);
free(data_h);
nvshmem_finalize();
return 0;
}