CUDA Stream Handling in Holoscan Applications
CUDA provides the concept of streams to allow for asynchronous concurrent execution on the GPU. Each stream is a sequence of commands that execute in order, but work launched on separate streams can potentially operate concurrently. Examples are running multiple kernels in separate streams or overlapping data transfers and kernel execution. See the Asynchronous Concurrent Execution section of the CUDA programming guide.
Holoscan provides automatic CUDA stream management to enable fully asynchronous GPU pipelines without explicit synchronization. This section covers the recommended usage patterns and provides a motivating example before diving into advanced configuration options.
For most operators that perform GPU work, the recommended pattern is to use receive_cuda_stream (C++/Python):
void MyOperator::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
// 1. Receive input data (records stream IDs found on upstream port "input")
auto in_tensor = op_input.receive<Tensor>("input").value();
// 2. Get the operator's internal stream (synchronizes upstream work)
cudaStream_t op_stream = op_input.receive_cuda_stream("input");
// allocation of out_tensor omitted here for simplicity
// 3. Use the stream for GPU work
float* in_data = static_cast<float*>(in_tensor->data());
my_kernel<<<grid, block, 0, op_stream>>>(in_data, out_data, ...);
// 4. Emit output (stream ID automatically attached)
op_output.emit(out_tensor, "output");
}
def compute(self, op_input, op_output, context):
# 1. Receive input data (records stream IDs from upstream)
in_tensor = op_input.receive("input")
# 2. Get the operator's internal stream (synchronizes upstream work)
stream_ptr = op_input.receive_cuda_stream("input")
# 3. Use the stream for GPU work (e.g., with CuPy)
with cp.cuda.ExternalStream(stream_ptr):
# GPU operations here
out_tensor = 4 * in_tensor
pass
# 4. Emit output (stream ID automatically attached)
op_output.emit(out_tensor, "output")
What
receive_cuda_stream
Does
Allocates an internal stream: Once per operator, reused across all
computecallsSynchronizes upstream streams: Uses CUDA events to ensure upstream GPU work completes before this operator’s work begins
Sets the CUDA device: Ensures the correct GPU is active for subsequent CUDA calls
Configures output ports: Automatically attaches the internal stream ID to all emitted messages
Returns the internal
cudaStream_t: Use this for all GPU kernels and async memory operations
Non-Blocking Synchronization
The synchronization between upstream and internal streams is fully asynchronous; no CPU blocking occurs:
CPU: [cudaEventRecord] [cudaStreamWaitEvent] [queues kernels] [returns from compute]
| | |
V V V
GPU upstream: ===[work]=======[event fires]
|
GPU internal: ====================X====[waits]====[runs kernels]====>
Holoscan uses cudaEventRecord() and cudaStreamWaitEvent() which both return immediately. The CPU queues all work and returns; the GPU scheduler handles the actual ordering. This avoids blocking calls like cudaStreamSynchronize(), cudaEventSynchronize(), or cudaDeviceSynchronize().
Why Stream Handling Matters
Because CUDA kernels are launched asynchronously, an operator’s compute() method can return before the GPU work actually completes. When data is passed to a downstream operator, that downstream operator needs to know which stream the data was produced on so it can ensure the upstream work is complete before accessing the data. This is why stream IDs are attached to messages and why receive_cuda_stream performs synchronization.
Benchmarking Caveat
Since compute() can return while GPU work is still in progress, timing tools like Data Flow Tracking or GXF JobStatistics may report misleadingly short durations for operators that launch async GPU work. The actual kernel execution time may be attributed to a downstream operator that triggers synchronization (e.g., for a device-to-host copy). For accurate GPU timing, use Nsight Systems. See Common Pitfalls at the end of this page for more details.
This example illustrates the intended stream API usage patterns with advice on how to maximize parallelism while avoiding race conditions. It also explains why receive_cuda_stream returns the operator’s own internal stream (rather than the upstream stream).
Consider a pipeline with parallel branches that converge:
Operator A (root): has one output port “out” connected to both B and D
Operator B (branch 1): has input port “in” and output port “out”
Operator D (branch 2): has input port “in” and output port “out”
Operator C (leaf): has two input ports “in_b” (from B) and “in_d” (from D)
+------------+
+---->| Operator B |----+
| +------------+ |
+------------+ | | +------------+
| Operator A |-->+ +--->| Operator C |
| (root) | | | | (leaf) |
+------------+ | +------------+ | +------------+
+---->| Operator D |----+
+------------+
Operator A (Root)
Root operators that generate data (rather than receiving it from upstream) use allocate_cuda_stream and set_cuda_stream:
void OperatorA::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
// Allocate a stream for this operator's GPU work
cudaStream_t stream_A = context.allocate_cuda_stream("stream_A").value();
// Perform GPU work on stream_A
my_kernel<<<grid, block, 0, stream_A>>>(data);
// Call set_cuda_stream to attach stream_A's ID to outgoing messages
op_output.set_cuda_stream(stream_A, "out");
// Emit output - both B and D will receive this message with stream_A's ID
op_output.emit(tensor, "out");
}
def compute(self, op_input, op_output, context):
# Allocate a stream for this operator's GPU work
stream_A = context.allocate_cuda_stream("stream_A")
# Perform GPU work on stream_A (e.g., with CuPy)
with cp.cuda.ExternalStream(stream_A):
# GPU operations here
pass
# Call set_cuda_stream to attach stream_A's ID to outgoing messages
op_output.set_cuda_stream(stream_A, "out")
# Emit output - both B and D will receive this message with stream_A's ID
op_output.emit(tensor, "out")
What happens:
allocate_cuda_stream("stream_A")allocates a dedicated stream for operator AGPU kernel is launched asynchronously on
stream_A; control returns to CPU immediatelyset_cuda_stream()configures the output port to includestream_A’s ID as a component in the messageemit()sends the tensor along with the stream ID to downstream operators B and DThe kernel may still be running on the GPU when
compute()returns
Zero-Copy and Race Conditions
When a Tensor is emitted, it is transmitted as a shared_ptr<Tensor>; this is a zero-copy operation. Both operators B and D receive pointers to the same underlying memory. If either B or D modifies the input tensor in-place, it will create a race condition with the other operator. To avoid this:
Treat input tensors as read-only when the same data is sent to multiple downstream operators
If modification is needed, allocate new output tensors rather than modifying inputs in-place
Operator B (Branch 1)
void OperatorB::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
// Receive input data - this captures stream_A's ID from the message
auto tensor = op_input.receive<TensorMap>("in").value();
// Get operator B's INTERNAL stream with GPU-side synchronization to stream_A
// (This does NOT block the CPU - it uses cudaEventRecord/cudaStreamWaitEvent)
cudaStream_t stream_B = op_input.receive_cuda_stream("in");
// Perform GPU work on stream_B (guaranteed to run after stream_A's work completes)
transform_B<<<grid, block, 0, stream_B>>>(tensor);
// emit() automatically attaches stream_B's ID (configured by receive_cuda_stream)
op_output.emit(result_b, "out");
}
def compute(self, op_input, op_output, context):
# Receive input data - this captures stream_A's ID from the message on "in"
tensor = op_input.receive("in")
# Get operator B's INTERNAL stream with GPU-side synchronization to stream_A
# (This does NOT block the CPU - it uses cudaEventRecord/cudaStreamWaitEvent)
stream_B = op_input.receive_cuda_stream("in")
# Perform GPU work on stream_B
with cp.cuda.ExternalStream(stream_B):
# GPU operations here - guaranteed to run after stream_A's work completes
pass
# emit() automatically attaches stream_B's ID (configured by receive_cuda_stream)
op_output.emit(result_b, "out")
What happens:
receive()receives the tensor and internally notes thatstream_A’s ID was attachedreceive_cuda_stream("in")performs several operations:Calls
cudaEventRecord(event, stream_A): schedules an event to fire whenstream_A’s work completesCalls
cudaStreamWaitEvent(stream_B, event): tellsstream_Bto wait for that eventBoth calls return immediately: the CPU is not blocked; dependency is enforced on the GPU side
Returns
stream_Bfor use by this operatorConfigures the “out” port to automatically emit
stream_B’s ID
Kernel is launched on
stream_B: the GPU scheduler ensures it only executes afterstream_A’s work completesemit()sends the result withstream_B’s ID attached
Operator D (Branch 2)
Operator D follows the same pattern as Operator B. Both receive the same stream_A ID from operator A, but each allocates its own internal stream. This is critical for enabling GPU parallelism:
If B and D both reused stream_A:
GPU stream_A: ===[A's kernel]===[B's kernel]===[D's kernel]===> (sequential!)
With separate internal streams:
GPU stream_A: ===[A's kernel]================================>
|
+--event--> stream_B: [B's kernel]====> } Can run
| } in parallel!
+--event--> stream_D: [D's kernel]====> }
By using independent streams, the GPU scheduler can potentially execute B’s and D’s kernels concurrently (if resources allow), maximizing GPU utilization.
Parallel Execution is Not Guaranteed
Using separate CUDA streams enables parallel execution but does not guarantee it. If one kernel fully occupies the GPU’s resources (e.g., uses all available SMs or saturates memory bandwidth), the other kernel will be delayed until resources become available. The actual degree of parallelism depends on:
Kernel resource requirements (registers, shared memory, thread blocks)
GPU hardware capabilities (number of SMs, memory bandwidth)
Current GPU utilization from other work
For scenarios requiring guaranteed SM partitioning between operators, Holoscan provides the CudaGreenContext APIs which wrap CUDA’s Green Contexts feature. See Configuring a CUDA Green Context for an operator for details.
Operator C (Leaf with Multiple Inputs)
When an operator receives from multiple input ports, call receive_cuda_stream for each port:
void OperatorC::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
// Receive data from BOTH upstream operators
auto tensor_b = op_input.receive<TensorMap>("in_b").value(); // captures stream_B's ID
auto tensor_d = op_input.receive<TensorMap>("in_d").value(); // captures stream_D's ID
// IMPORTANT: Call receive_cuda_stream for BOTH input ports!
// First call allocates stream_C and synchronizes stream_B to it
cudaStream_t stream_C = op_input.receive_cuda_stream("in_b");
// Second call synchronizes stream_D to the SAME stream_C
// (returns the same stream_C that was already allocated)
cudaStream_t same_stream = op_input.receive_cuda_stream("in_d");
assert(stream_C == same_stream); // same internal stream
// Perform GPU work on stream_C - guaranteed to run after BOTH B and D complete
combine_and_process<<<grid, block, 0, stream_C>>>(tensor_b, tensor_d);
}
def compute(self, op_input, op_output, context):
# Receive data from BOTH upstream operators
tensor_b = op_input.receive("in_b") # captures stream_B's ID
tensor_d = op_input.receive("in_d") # captures stream_D's ID
# IMPORTANT: Call receive_cuda_stream for BOTH input ports!
# First call allocates stream_C and synchronizes stream_B to it
stream_C = op_input.receive_cuda_stream("in_b")
# Second call synchronizes stream_D to the SAME stream_C
stream_C_again = op_input.receive_cuda_stream("in_d")
assert stream_C == stream_C_again # same internal stream
# Perform GPU work on stream_C - guaranteed to run after BOTH B and D complete
with cp.cuda.ExternalStream(stream_C):
# combine_and_process operations here
pass
What happens:
Both
receive()calls capture their respective upstream stream IDs (stream_Bandstream_D)First
receive_cuda_stream("in_b")allocatesstream_Cand synchronizesstream_Bto itSecond
receive_cuda_stream("in_d")reusesstream_Cand synchronizesstream_Dto itAfter both calls,
stream_Cwill wait for both upstream streams before executing any work
End-to-End Timeline
CPU: [A: emit]--[B: receive, emit]--[D: receive, emit]--[C: receive both]--[return]
| | | |
V V V V
GPU: ===[stream_A: kernel]========================================>
|
+--event-->[stream_B: waits]===[kernel]=========>
| |
| +--event--+
| V
+--event-->[stream_D: waits]===[kernel]==========X==>
| |
+--event-->[stream_C: waits for both]===[kernel]===>
Key observations:
All CPU-side
compute()calls return quickly without waiting for GPU workGPU work is properly ordered via event-based dependencies (no CPU blocking)
B and D can run in parallel on the GPU since they use independent streams
C’s stream waits for both upstream streams before executing
Stream IDs propagate through the pipeline via message components
Always call
receive()beforereceive_cuda_stream(): receiving is what captures stream IDs from incoming messagesUse the returned stream for all GPU work: ensures proper ordering with upstream operators
Don’t explicitly synchronize: let the framework handle synchronization via events
One stream per operator: simplifies reasoning about dependencies
Call
receive_cuda_stream()for each input port: ensures all upstream work is synchronized
The CudaStreamPool class (C++/Python) provides a mechanism for allocating CUDA streams from a pool whose lifetime is managed by Holoscan.
For advanced use cases requiring guaranteed GPU resource isolation, the CudaGreenContextPool (C++/Python) and CudaGreenContext (C++/Python) classes wrap CUDA’s Green Contexts feature, allowing partitioning of GPU SMs into isolated contexts for different operators.
There is a legacy CudaStreamHandler utility class (provided via #include "holoscan/utils/cuda_stream_handler.hpp") that made it possible to write a C++ operator that could make use of a CudaStreamPool. This class had some limitations:
It required receiving messages as type
holoscan::gxf::Entity.It required using
nvidia::gxf::Entityandnvidia::gxf::Handlemethods from the underlying GXF library.It was not available for native Python operators.
This existing utility is still provided for backwards compatibility and operators using it can continue to interoperate with those using the new APIs. However, we encourage operator authors to migrate to using the new APIs going forward.
A default CudaStreamPool is added to all operators if the user did not otherwise provide one. This means that in most cases, it will not be necessary for the user to explicitly add a stream pool. The default stream pool has unbounded size, no flags set and a priority value of 0. In cases when the user wants to allocate streams with different flags or priority, the section below can be followed to add a customized stream pool to the operator.
The only case when a default stream pool would not be added is if the application (fragment) is running on a node without any CUDA-capable devices. In that case, since use of CUDA is not possible a default stream pool would not be added.
To enable an operator to allocate a CUDA stream, the user can pass a CudaStreamPool as in the following examples. The general pattern used for stream handling in Holoscan SDK is to have each Operator that wants to use a non-default stream have a CudaStreamPool assigned. That operator will then reserve a dedicated stream from the stream pool for use by any kernels launched by it. Multiple operators are allowed to use the same stream pool, with “max_size” of the shared pool equal to at least the number of Operators that are sharing it.
Note that the CudaStreamPool will manage the lifetimes of any CUDA streams used by the SDK. The user does not typically need to explicitly call any CUDA APIs to create or destroy streams. Note that all streams from a single CudaStreamPool are on a single device (with CUDA id as passed to the “dev_id” argument). If the workflow involves operators that run on separate CUDA devices, those operators must use separate stream pools configured for the corresponding device.
// The code below would appear within `Application::compose` (or `Fragment::compose`)
// Create a stream pool with a capacity of 5 streams (5 operators could share the same pool)
const auto cuda_stream_pool = make_resource<CudaStreamPool>("stream_pool",
Arg("dev_id", 0),
Arg("stream_flags", 0u),
Arg("stream_priority", 0),
Arg("reserved_size", 1u),
Arg("max_size", 5u));
auto my_op = make_operator<MyOperator>("my_op", cuda_stream_pool, arg_list);
// Alternatively, the argument can be added via `add_arg` after operator construction
// auto my_op = make_operator<MyOperator>("my_op", arg_list);
// my_op->add_arg(cuda_stream_pool);
Note that the legacy CudaStreamHandler utility did not support passing the stream pool in
this way, but instead required that the user explicitly add a parameter to the operator’s private
data members.
private:
// The legacy CudaStreamHandler required a "cuda_stream_pool" parameter.
// The spec.param call in the Operator's `setup` method would use the name "cuda_stream_pool"
// for it
Parameter<std::shared_ptr<CudaStreamPool>> cuda_stream_pool_{};
For backwards compatibility with prior releases, the built-in operators that were previously using the CudaStreamHandler utility class still offer this explicitly defined “cuda_stream_pool” parameter. It is not necessary for the user to add it to their own operators unless they prefer to explicitly use an Arg named “cuda_stream_pool” parameter when initializing the operator.
auto visualizer = make_operator<HolovizOp>(
"visualizer",
from_config("holoviz"),
Arg("cuda_stream_pool", make_resource<CudaStreamPool>(0, 0, 0, 1, 5)));
# The code below would appear within `Application.compose` (or `Fragment.compose`)
# Create a stream pool with a 5 streams capacity (5 operators could share the same pool)
cuda_stream_pool = CudaStreamPool(
self,
name="stream_pool",
dev_id=0,
stream_flags=0,
stream_priority=0,
reserved_size=1,
max_size=5,
)
my_op = MyOperator(self, cuda_stream_pool, name="my_op", **my_kwargs)
# Alternatively, the argument can be added via `add_arg` after operator construction
# auto my_op = MyOperator(self, name="my_op", **my_kwargs)
# my_op.add_arg(cuda_stream_pool)
The above is the recommended way for user-defined operators to add a CudaStreamPool. For purposes of backwards compatibility, the built-in operators of the SDK that already had a keyword-based cuda_stream_pool parameter continue to also allow passing the stream pool as in the following example:
visualizer = HolovizOp(
self,
name="holoviz",
cuda_stream_pool=CudaStreamPool(self, 0, 0, 0, 1, 5),
**self.kwargs("holoviz"))
A Green Context is a lightweight CUDA context associated with a specific set of GPU resources. When creating a green context, users can partition GPU resources, currently streaming multiprocessors (SMs) and work queues, so that GPU work targeting a green context can only use its provisioned resources. This can be beneficial for:
Reducing interference: Ensuring latency-sensitive operators always have SMs available to start executing immediately
Resource isolation: Preventing one operator’s heavy GPU workload from starving another operator
Controlled concurrency: Provisioning work queues to avoid false dependencies between independent streams
Using green contexts does not require any GPU kernel code changes; just small host-side changes to create the green context and associate streams with it. See the CUDA Programming Guide section on Green Contexts for more details on the underlying CUDA feature.
Even when different SM resources and work queues are provisioned per green context, concurrent execution of independent GPU work is not guaranteed. Green contexts help reduce interference but do not eliminate all factors that can affect scheduling. Think of green contexts as a best-effort mechanism for resource isolation.
To enable an operator to use a dedicated CUDA Green Context in Holoscan, create a CudaGreenContextPool resource with an SM partition table and assign a CudaGreenContext from the pool to the operator. A CudaStreamPool can then be created using this CudaGreenContext.
Below is an example of how to configure a CudaGreenContextPool and assign a CudaGreenContext to an operator in C++ and Python.
// The code below would appear within `Application::compose` (or `Fragment::compose`)
// Create a green context pool with 2 partitions, and each uses 8 SMs. The default green context is the last partition from the green context pool. If the total SMs is bigger than 16, then an additional
// green context will be created and can be used as default green context partition.
std::vector<uint32_t> partitions = std::vector<uint32_t>{8, 8};
const auto cuda_green_context_pool =
make_resource<CudaGreenContextPool>("cuda_green_context_pool",
Arg("dev_id", 0),
Arg("num_partitions", (uint32_t)partitions.size()),
Arg("sms_per_partition", partitions));
// Create a green context
const auto cuda_green_context = make_resource<CudaGreenContext>("cuda_green_context",
cuda_green_context_pool, 1);
// Create CudaStreamPool with green context
const auto cuda_stream_pool =
make_resource<CudaStreamPool>("cuda_stream_pool", 0, 0, 0, 1, 5, cuda_green_context);
auto tx = make_operator<ops::PingTxOp>("tx",
make_condition<CountCondition>(10),
cuda_stream_pool);
# The code below would appear within `Application.compose` (or `Fragment.compose`)
partitions = [8, 8]
cuda_green_context_pool = CudaGreenContextPool(
self,
dev_id=0,
flags=0,
num_partitions=2,
sms_per_partition=partitions,
name="cuda_green_context_pool",
)
cuda_green_context = CudaGreenContext(
self,
cuda_green_context_pool=cuda_green_context_pool,
index=1,
name="cuda_green_context",
)
stream_pool = CudaStreamPool(
self,
name="stream_pool",
dev_id=0,
stream_flags=0,
stream_priority=0,
reserved_size=1,
max_size=5,
cuda_green_context=cuda_green_context,
)
Note that for CudaGreenContextPool, the default green context partition can also be specified using default_context_index if one of the partitions is to be used.
For CudaGreenContext, argument index is optional, if not specified, the pool default green context is used.
The following code demonstrates operators that need to use the default green context only.
auto operator_tx =
make_operator<ops::PingTxOp>("tx", cuda_green_context_pool);
A default CudaGreenContextPool can also be added to a fragment or an application. In this case, operators that do not need a dedicated CUDA context partition can use the following method to use the default fragment level default CudaGreenContextPool.
// Create a green context pool which will be used as the default green context pool for the current fragment
const auto cuda_green_context_pool = add_default_green_context_pool(0, partitions);
auto operator_tx = make_operator<ops::PingTxOp>("tx", cuda_green_context_pool);
Behind the scenes, CudaStreamPool (C++/Python) allocates nvidia::gxf::CudaStream objects (an RAII wrapper around a cudaStream_t). These exist as components in the underlying GXF entity-component system. When a message is emitted, a nvidia::gxf::CudaStreamId struct (containing the stream’s “component ID”) is attached to the message. This is how downstream operators know which stream was used by upstream operators.
One Stream Per Message
Each emitted message (Entity) is associated with at most one CUDA stream. If no stream handling APIs are used by the operator, the message will have no stream attached (equivalent to the default stream). When an operator uses receive_cuda_stream and then emits data, its internal stream ID is automatically attached to the outgoing message. If an operator receives a message and forwards the same Entity to the output (rather than creating a new one), the stream ID in that Entity is updated to reflect the operator’s internal stream.
Application authors do not need to interact with CudaStream or CudaStreamId directly. Instead, use the standard CUDA Runtime API cudaStream_t type returned by Holoscan’s stream handling methods. The SDK provides several methods for working with streams from an operator’s compute method, described in detail below.
As covered in the Quick Start section, receive_cuda_stream (C++/Python) is the recommended method for stream handling. This section provides additional details on its parameters and edge cases.
For a given input port, receive must always be called before receive_cuda_stream. The receive call captures stream IDs from incoming messages; the subsequent receive_cuda_stream call performs synchronization and returns the operator’s internal stream.
Stream Extraction from Messages
When receive is called, Holoscan extracts the first CudaStreamId component found in each received Entity. For standard input ports, this is one stream per message. For multi-receiver ports (IOSpec::kAnySize), each connected upstream operator sends a separate Entity, so streams from all upstream operators are captured — one stream per Entity (message), not multiple streams within a single Entity.
The receive_cuda_stream call then synchronizes all captured streams to the operator’s internal stream.
Here is an example of the typical usage of this method from the built-in BayerDemosaicOp
// The code below would appear within `Operator::compute`
// Process input message
auto maybe_message = op_input.receive<gxf::Entity>("receiver");
if (!maybe_message || maybe_message.value().is_null()) {
throw std::runtime_error("No message available");
}
auto in_message = maybe_message.value();
// Get the CUDA stream from the input message if present, otherwise generate one.
// This stream will also be transmitted on the "tensor" output port.
cudaStream_t cuda_stream = op_input.receive_cuda_stream("receiver", // input port name
true, // allocate
false); // sync_to_default
// assign the CUDA stream to the NPP stream context
npp_stream_ctx_.hStream = cuda_stream;
Note that BayerDemosaicOp is implemented in C++ using code shown in the C++ tab, but this shows how the equivalent code would look in the Python API.
# The code below would appear within `Operator.compute`
# Process input message
in_message = op_input.receive("receiver")
if in_message is None:
raise RuntimeError("No message available")
# Get the CUDA stream from the input message if present, otherwise generate one.
# This stream will also be transmitted on the "tensor" output port.
cuda_stream_ptr = op_input.receive_cuda_stream("receiver", allocate=True, sync_to_default=False)
# can then use cuda_stream_ptr to create a `cupy.cuda.ExternalStream` context, for example
It can be seen that the call to receive occurs prior to the call to receive_cuda_stream for the “receiver” input port as required. Also note that unlike for the legacy CudaStreamHandler utility class, it is not required to use gxf::Entity in the “receive” call. That type is use by some built-in operators like BayerDemosaicOp as a way to support both the nvidia::gxf::VideoBuffer type and the usual Tensor type as inputs. If only Tensor was supported we could have used receive<std::shared_ptr<Tensor>> or receive<TensorMap> instead.
The second boolean argument to receive_cuda_stream defaults to true and indicates that the operator should allocate its own internal stream. This could be set to false to not allow the operator to allocate its own internal stream from the stream pool. See the note below on the details of how receive_cuda_stream behaves in that case.
There is also an optional third argument to receive_cuda_stream which is a boolean specifying whether synchronization of the input streams (and internal stream) to CUDA’s default stream should also be performed. This option is false by default.
The above description of receive_cuda_stream is accurate when a CudaStreamPool has been passed to the operator in one of the ways described above. See the note below for additional detail on how this method operates if the operator is unable to allocate an internal stream because a CudaStreamPool was unavailable.
Avoiding additional synchronization from Python’s CUDA Array Interface
Python applications converting between Holoscan’s Tensor and 3rd party tensor objects often use the CUDA Array Interface. This interface by default performs its own explicit synchronization (described here). This may be unnecessary when using receive_cuda_stream which already synchronizes streams found on the input with the operator’s internal stream. The environment variable CUPY_CUDA_ARRAY_INTERFACE_SYNC can be set to 0 to disable an additional synchronization by CuPy when creating a CUDA array from a holoscan Tensor via the array interface. Similarly, HOLOSCAN_CUDA_ARRAY_INTERFACE_SYNC can be set to 0 to disable synchronization by the array interface on the Holoscan side when creating a Holoscan tensor from a 3rd party tensor.
Using
receive_cuda_stream
without a stream pool available
This section describes the behavior of receive_cuda_stream in the case where no streams are available in the operator’s CudaStreamPool (or the allocate argument of receive_cuda_stream was set to false). In this case, receive_cuda_stream will not be able to allocate a dedicated internal stream for the operator’s own use. Instead, the cudaStream_t corresponding to the first stream found on the named input port will be returned and any additional streams on that input port would be synchronized to it. If a subsequent receive_cuda_stream call was made for another input port, any streams found on that second port are synchronized to the cudaStream_t that was returned by the first receive_cuda_stream call and the stream returned is that same cudaStream_t. In other words, the first stream found on the initial call to receive_cuda_stream will be repurposed as the operator’s internal stream to which any other input streams are synchronized. This same stream will also be the one automatically emitted on the output ports.
In the case that there is no CudaStreamPool and there is no stream found for the input port (or by any prior receive_cuda_stream calls for another port), then receive_cuda_stream will return the default stream (cudaStreamDefault). No stream would be emitted on the output ports in this case.
The following methods are provided for advanced use cases requiring manual stream management.
receive_cuda_streams
(InputContext)
The receive_cuda_streams (C++/Python) method is for cases where manual stream management is needed. Unlike receive_cuda_stream, this method does not perform synchronization, allocate an internal stream, update the CUDA device, or configure output ports. It simply returns a std::vector<std::optional<cudaStream_t>> containing the stream from each message on the input port (or std::nullopt if no stream ID was found).
Note that as for receive_cuda_stream, it is important that any receive_cuda_streams call for a port is after the corresponding receive call for that same port. An example is given below
// The code below would appear within `Operator::compute`
// Process a "receivers" port (e.g. one having IOSpec::kAnySize) that may
// have an arbitrary number of connections, each of which may have sent a
// TensorMap. Here we will assume there is just one tensor per connection
// and receive as std::vector<Tensor> for simplicity.
auto maybe_tensors = op_input.receive<std::vector<Tensor>>("receivers");
if (!maybe_tensors) { throw std::runtime_error("No message available"); }
auto tensormaps = maybe_tensors.value();
// Get a vector of std::optional<cudaStream_t> containing any streams
// found by the above receive call.
auto cuda_streams = op_input.receive_cuda_streams("receivers");
# The code below would appear within `Operator.compute`
tensors = op_input.receive("receivers")
if tensors is None:
raise RuntimeError("No message available on 'receivers' input")
cuda_stream_ptrs = op_input.receive_cuda_streams("receivers")
allocate_cuda_stream
(ExecutionContext)
The allocate_cuda_stream (C++/Python) method allocates additional CUDA streams from the operator’s CudaStreamPool. Returns unexpected (or None in Python) if no stream pool is available or all streams are in use. Streams are cached by name; the same name returns the same stream on subsequent calls. Streams allocated this way are not automatically emitted; use set_cuda_stream before emit if needed.
// The code below would appear within `Operator::compute`
// allocate a new CUDA stream
auto maybe_stream = context.allocate_cuda_stream("my_stream");
// raise an error if no stream was allocated
if (!maybe_stream) {
const auto& error = maybe_stream.error();
throw std::runtime_error(
fmt::format("Failed to allocate cuda stream with error: {}", error.what()));
}
// retrieve the stream from the `expected<cudaStream_t, RuntimeError>`
cudaStream_t my_stream = maybe_stream.value();
// some custom code using the CUDA stream here
// emit the allocated stream on the "out" port
op_output.set_cuda_stream(my_stream, "out");
# The code below would appear within `Operator.compute`
my_stream = context.allocate_cuda_stream("my_stream")
# some custom code using the CUDA stream here
# emit the allocated stream on the "out" port
op_output.set_cuda_stream(my_stream, "out")
synchronize_streams
(ExecutionContext)
The synchronize_streams (C++/Python) method synchronizes a vector of streams to a target stream using the same non-blocking event-based mechanism as receive_cuda_stream (cudaEventRecord / cudaStreamWaitEvent). When using receive_cuda_stream, synchronization is handled automatically; this method is for manual stream handling use cases.
device_from_stream
(ExecutionContext)
The device_from_stream (C++/Python) method returns the CUDA device ID for a given stream. This method only works with streams managed by Holoscan SDK (those returned by receive_cuda_stream, receive_cuda_streams, or allocate_cuda_stream).
set_cuda_stream
(OutputContext)
The set_cuda_stream (C++/Python) method configures a specific stream to be emitted on an output port. Not needed when using receive_cuda_stream (which auto-configures output ports), but required when using allocate_cuda_stream or manual stream handling. See the allocate_cuda_stream example above.
Single Stream Per Output Port
Only one stream can be configured per output port. If set_cuda_stream is called multiple times for the same output port within a single compute call, the last call takes effect (replacing any previously configured stream for that port). This is consistent with the design that each emitted message carries at most one CUDA stream ID.
By default, receive_cuda_stream synchronizes streams inside compute(). The operator is scheduled immediately when a message arrives, even if upstream GPU work is still in progress. For operators that need data to be immediately available (e.g., device-to-host copies, CPU processing), it may be beneficial to delay scheduling until upstream GPU work completes.
CudaStreamCondition (C++/Python) provides this capability. When a message arrives, it registers host callbacks (via cudaLaunchHostFunc) on the input streams. The operator is only marked READY after all callbacks fire, indicating GPU work is complete.
Features:
Multiple input ports: Monitor multiple ports simultaneously, including multi-receiver ports (
IOSpec::kAnySize)All messages in queue: Checks streams on all messages, not just the first
CudaStreamCondition internally uses findAll to discover all CudaStreamId components in each Entity, which allows it to handle edge cases where multiple stream IDs might exist. However, the standard Holoscan stream APIs (set_cuda_stream, receive_cuda_stream, receive_cuda_streams) are designed around a single stream per Entity model. There is currently no public API to intentionally emit or receive multiple streams within the same Entity.
If no stream is found in an input message, execution is allowed.
Example usage is as follows
// The code below would appear within `Application::compose` (or `Fragment::compose`)
// Monitor a single input port named "in"
auto stream_cond = make_condition<CudaStreamCondition>("stream_sync",
Arg("receivers", std::string("in")));
// Monitor multiple input ports
auto stream_cond = make_condition<CudaStreamCondition>("stream_sync",
Arg("receivers", std::vector<std::string>{"in1", "in2"}));
// Monitor a multi-receiver port (kAnySize) - discovers receivers:0, receivers:1, etc.
auto stream_cond = make_condition<CudaStreamCondition>("stream_sync",
Arg("receivers", std::string("receivers")));
// Pass the condition as an argument to `make_operator`
auto my_op = make_operator<ops::MyOperator>("my_op",
stream_cond,
from_config("my_operator"));
# The code below would appear within `Application.compose` (or `Fragment.compose`)
# Monitor a single input port named "in"
stream_cond = CudaStreamCondition(self, receivers="in", name="stream_sync")
# Monitor multiple input ports
stream_cond = CudaStreamCondition(self, receivers=["in1", "in2"], name="stream_sync")
# Monitor a multi-receiver port (kAnySize) - discovers receivers:0, receivers:1, etc.
stream_cond = CudaStreamCondition(self, receivers="receivers", name="stream_sync")
# Pass the condition as a positional argument to an Operator's constructor
my_op = MyOperator(
self,
stream_cond,
**my_kwargs,
name="my_op",
)
Since CUDA kernels launch asynchronously, compute() may exit before GPU work completes. This is desirable for performance, but has some implications:
Tools like the built-in Data Flow Tracking or GXF JobStatistics measure the time spent in the compute method for operators. This can be misleadingly short when the actual GPU kernels complete at some later time after the compute call has ended. A concrete example is when an upstream operator launches a CUDA kernel asynchronously and then a downstream operator needs to do a device->host transfer (which requires synchronization). In that scenario the downstream operator will need to wait for the kernel launched by the upstream operator to complete, so the time for that upstream kernel would be reflected in the downstream operator’s compute duration (assuming no CudaStreamCondition was used to force the upstream kernel to have completed before the downstream compute method was called).
In such scenarios it is recommended to perform profiling with Nsight Systems to get a more detailed view of the application timing. The Nsight Systems UI will have per-stream traces of CUDA calls as well as separate traces for any scheduler worker threads that show the durations of Operator compute calls.
When an operator uses an Allocator (e.g. UnboundedAllocator, BlockMemoryPool, RMMAllocator or StreamOrderedAllocator) to dynamically allocate memory on each compute call, it is possible that more memory will be required than initially estimated. For example, if a kernel is launched but compute returns while computation is still being done on a tensor, an upstream operator is then free to be scheduled again. If that upstream operator was using an Allocator, the memory from the prior compute call would still be in use. Thus the operator needs space to allocate a second tensor on top of the original one. This means the author has to set a larger number of required bytes (or blocks) than they would have otherwise estimated (e.g. 2x as many).
Sink operators (operators that consume data but don’t emit it) require special handling when using pool-based allocators (BlockMemoryPool, StreamOrderedAllocator, RMMAllocator) and asynchronous GPU work. This section explains why and provides guidance for operator authors.
The Problem
For operators that emit data, the deallocation stream is automatically set on output tensors during emit(). However, sink operators don’t call emit(), which means the input tensors retain whatever stream was set by the upstream operator. This can cause a race condition:
Upstream operator emits tensor on stream A
Sink operator receives tensor and launches GPU work on stream B (its internal stream)
Sink operator’s
compute()returns (GPU work still running on stream B)Input tensor’s reference count drops to zero, triggering deallocation
Allocator sees stream A on the tensor and may reuse memory before stream B completes
Race condition: New data overwrites memory while GPU is still reading it
When This Matters
This issue only affects sink operators that meet all of these criteria:
Use a pool-based allocator:
BlockMemoryPool,StreamOrderedAllocator, orRMMAllocator. TheUnboundedAllocatordoes not pool memory so is unaffected.Launch async GPU work: CPU-only operators don’t have this issue
Return before GPU work completes: If the operator synchronizes (e.g.,
cudaStreamSynchronize()) before returning, memory is safe to reuse
Solution: Set the Deallocation Stream
Sink operators should inform the allocator which stream last accessed the tensor’s memory. There are two approaches depending on how you receive data:
For C++ Operators Using GXF Entities
If your operator receives data as holoscan::gxf::Entity and accesses nvidia::gxf::Tensor or nvidia::gxf::VideoBuffer components directly, call setStream() on the memory buffer:
void MySinkOp::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
auto in_message = op_input.receive<holoscan::gxf::Entity>("input").value();
cudaStream_t cuda_stream = op_input.receive_cuda_stream("input");
// Set deallocation stream on all tensors
void* stream_ptr = static_cast<void*>(cuda_stream);
auto tensors = in_message.findAll<nvidia::gxf::Tensor>();
if (tensors) {
for (auto&& tensor : tensors.value()) {
tensor.value()->memory_buffer().setStream(stream_ptr);
}
}
// Set deallocation stream on all video buffers
auto video_buffers = in_message.findAllHeap<nvidia::gxf::VideoBuffer>();
if (video_buffers) {
for (auto&& video_buffer : video_buffers.value()) {
video_buffer.value()->memory_buffer().setStream(stream_ptr);
}
}
// Launch async GPU work on cuda_stream
my_kernel<<<grid, block, 0, cuda_stream>>>(tensor_data, ...);
// compute() returns while GPU work is still running - memory is safe
}
For C++ Operators Using holoscan::Tensor
If your operator receives data as std::shared_ptr<holoscan::Tensor> or holoscan::TensorMap, use the set_deallocation_stream() method:
void MySinkOp::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
auto tensor = op_input.receive<std::shared_ptr<holoscan::Tensor>>("input").value();
cudaStream_t cuda_stream = op_input.receive_cuda_stream("input");
// Set deallocation stream on the tensor
tensor->set_deallocation_stream(cuda_stream);
// Launch async GPU work
my_kernel<<<grid, block, 0, cuda_stream>>>(tensor->data(), ...);
}
For TensorMap:
void MySinkOp::compute(InputContext& op_input, OutputContext& op_output, ExecutionContext& context) {
auto tensor_map = op_input.receive<holoscan::TensorMap>("input").value();
cudaStream_t cuda_stream = op_input.receive_cuda_stream("input");
// Set deallocation stream on all tensors in the map
for (auto& [name, tensor] : tensor_map) {
tensor->set_deallocation_stream(cuda_stream);
}
// Launch async GPU work
// ...
}
set_deallocation_stream() only works for tensors whose memory is managed by a Holoscan/GXF allocator. For tensors created from external sources (e.g., CuPy or PyTorch arrays via the DLPack interface), the method returns false and has no effect since memory lifetime is managed externally.
For Python Operators
Python operators can use the set_deallocation_stream() method on holoscan.Tensor:
def compute(self, op_input, op_output, context):
tensor = op_input.receive("input")
cuda_stream = op_input.receive_cuda_stream("input")
# Set deallocation stream
if hasattr(tensor, 'set_deallocation_stream'):
tensor.set_deallocation_stream(cuda_stream)
# Use tensor with CuPy on the stream
with cp.cuda.ExternalStream(cuda_stream):
# GPU operations here
pass
Built-in Operators
The built-in HolovizOp already handles this correctly by setting the deallocation stream on all input tensors and video buffers before launching GPU rendering work.
The stream handling model is designed primarily for single-GPU pipelines. However, operators on separate GPUs can interoperate:
Each operator can have its own
CudaStreamPoolconfigured for a different GPU (dev_idparameter)CUDA’s
cudaStreamWaitEvent()supports cross-device synchronizationreceive_cuda_stream()sets the active CUDA device to match the stream’s deviceCUDA’s “current device” is thread-local state, so parallel operators on different scheduler threads can safely target different GPUs
Data Locality Limitation
Tensors emitted via shared_ptr<Tensor> are zero-copy, referencing device memory on a specific GPU. Device memory on GPU0 is not directly accessible from GPU1 (unless using CUDA P2P, Unified Memory, or NVLink). Operators on different GPUs must explicitly handle data transfer.