holoscan::CudaStreamCondition

Beta
View as Markdown

Native condition class for CUDA stream synchronization with multi-message support.

This condition supports:

  • Multiple messages in the input queue (queue_size > 1)
  • Multiple CudaStreamId components per message
  • Multiple receiver ports (both regular and IOSpec::kAnySize multi-receiver inputs)

By default, this condition examines ALL messages in all receiver queues and waits for GPU work on ALL associated CUDA streams to complete before allowing the operator to execute. This behavior can be changed by setting check_all_messages to false, which makes it check only the first message per receiver.

The condition uses cudaLaunchHostFunc() to register callbacks that fire when GPU work on each stream completes. It returns kWaitEvent status while waiting for callbacks, then transitions to kReady when all callbacks have fired.

Note: This condition does NOT consume messages - it only peeks at them. The operator’s compute() method is responsible for actually receiving the messages.

==Parameters==

  • receiver (std::string): DEPRECATED - Use receivers instead. Legacy API for a single input port to monitor. Cannot be used together with receivers. Using this parameter will log a deprecation warning.
  • receivers (std::string or std::vector<std::string>): Name(s) of input port(s) to monitor for CUDA streams. Can be a single string like “input” or a vector like {“input1”, “input2”}. Works with both: Regular ports: specified by exact name (e.g., “input”) Multi-receiver ports (IOSpec::kAnySize): specified by base name (e.g., “receivers”), which automatically expands to find all ports matching the pattern (“receivers:0”, “receivers:1”, etc.) Cannot be used together with receiver.
    • Regular ports: specified by exact name (e.g., “input”)
    • Multi-receiver ports (IOSpec::kAnySize): specified by base name (e.g., “receivers”), which automatically expands to find all ports matching the pattern (“receivers:0”, “receivers:1”, etc.) Cannot be used together with receiver.
  • check_all_messages (bool, default=true): If true, checks ALL messages in the queue(s) for CudaStreamId components and waits for all associated streams. If false, only checks the first message in each queue.

==Usage==

#include <holoscan/cuda_stream.hpp>

Example

// Single regular input port (legacy API)
auto stream_cond = make_condition<CudaStreamCondition>(
"cuda_sync", Arg("receiver", "input"));
// Single regular input port (new API)
auto stream_cond = make_condition<CudaStreamCondition>(
"cuda_sync", Arg("receivers", "input"));
// Multiple regular input ports
auto stream_cond_multi = make_condition<CudaStreamCondition>(
"cuda_sync", Arg("receivers", std::vector<std::string>{"input1", "input2"}));
// Multi-receiver port (e.g., HolovizOp's "receivers")
auto stream_cond_any = make_condition<CudaStreamCondition>(
"cuda_sync", Arg("receivers", "receivers")); // finds receivers:0, receivers:1, etc.
// Check only first message per receiver
auto stream_cond_first = make_condition<CudaStreamCondition>(
"cuda_sync", Arg("receivers", "input"), Arg("check_all_messages", false));
auto my_op = make_operator<MyOperator>("my_op", stream_cond);

Inherits from: holoscan::Condition (public)


Constructors

CudaStreamCondition

inlineexplicit
template <typename ArgT,
typename... ArgsT,
typename = std::enable_if_t<!std::is_base_of_v<::holoscan::Condition, std::decay_t<ArgT>> && (std::is_same_v<::holoscan::Arg, std::decay_t<ArgT>> || std::is_same_v<::holoscan::ArgList, std::decay_t<ArgT>>)>>
holoscan::CudaStreamCondition::CudaStreamCondition(holoscan::CudaStreamCondition::CudaStreamCondition(
ArgT &&arg,
ArgsT &&... args
)

Methods

setup

void holoscan::CudaStreamCondition::setup(
ComponentSpec &spec
) override

Define the condition specification.

Parameters

spec
ComponentSpec &

The reference to the component specification.

initialize

void holoscan::CudaStreamCondition::initialize() override

Initialize the component.

This method is called only once when the component is created for the first time, and use of light-weight initialization.

update_state

void holoscan::CudaStreamCondition::update_state(
int64_t timestamp
) override

Checks if the state of the condition can be updated and updates it.

Parameters

timestamp
int64_t

The current timestamp

check

void holoscan::CudaStreamCondition::check(
int64_t timestamp,
SchedulingStatusType *type,
int64_t *target_timestamp
) const override

Check the condition status before allowing execution.

If the condition is waiting for a time event ‘target_timestamp’ will contain the target timestamp.

Parameters

timestamp
int64_t

The current timestamp

status_type
SchedulingStatusType *

The status of the condition

target_timestamp
int64_t *

The target timestamp (used if the term is waiting for a time event).

on_execute

void holoscan::CudaStreamCondition::on_execute(
int64_t timestamp
) override

Called each time after the entity of this term was executed.

Parameters

timestamp
int64_t

The current timestamp

receiver

inline
void holoscan::CudaStreamCondition::receiver(
std::shared_ptr<Receiver> receiver
)

Set receiver for this condition (legacy single-port API).

receivers

void holoscan::CudaStreamCondition::receivers(
std::vector<std::shared_ptr<Receiver>> receivers
)

Set receivers for this condition (for all input ports to monitor).

check_all_messages

void holoscan::CudaStreamCondition::check_all_messages(
bool value
)

Set whether to check all messages in the queue.

condition_type

ConditionComponentType holoscan::CudaStreamCondition::condition_type() constConditionComponentType holoscan::CudaStreamCondition::condition_type() const

Get the condition type.

Returns: The condition type.

name

Condition & holoscan::CudaStreamCondition::name(Condition & holoscan::CudaStreamCondition::name(
const std::string &name
) &

Set the name of the condition.

Returns: The reference to the condition.

Parameters

name
const std::string &

The name of the condition.

fragment

Condition & holoscan::CudaStreamCondition::fragment(Condition & holoscan::CudaStreamCondition::fragment(
Fragment *fragment
)

Set the fragment of the condition.

Returns: The reference to the condition.

Parameters

fragment
Fragment *

The pointer to the fragment of the condition.

spec

Condition & holoscan::CudaStreamCondition::spec(Condition & holoscan::CudaStreamCondition::spec(
const std::shared_ptr<ComponentSpec> &spec
)

Set the component specification to the condition.

Returns: The reference to the condition.

Parameters

spec
const std::shared_ptr<ComponentSpec> &

The component specification.

spec_shared

std::shared_ptr<ComponentSpec> holoscan::CudaStreamCondition::spec_shared()std::shared_ptr<ComponentSpec> holoscan::CudaStreamCondition::spec_shared()

Get the shared pointer to the component spec.

Returns: The shared pointer to the component spec.

add_arg

void holoscan::CudaStreamCondition::add_arg(
const std::shared_ptr<Resource> &arg
)

Add a resource to the condition.

Parameters

arg
const std::shared_ptr<Resource> &

The resource to add.

resources

std::unordered_map<std::string, std::shared_ptr<Resource>> & holoscan::CudaStreamCondition::resources()std::unordered_map<std::string, std::shared_ptr<Resource>> & holoscan::CudaStreamCondition::resources()

Get the resources of the condition.

Returns: The resources of the condition.

to_yaml_node

YAML::Node holoscan::CudaStreamCondition::to_yaml_node() const override

Get a YAML representation of the condition.

Returns: YAML node including spec of the condition in addition to the base component properties.

transmitter

std::optional<std::shared_ptr<Transmitter>> holoscan::CudaStreamCondition::transmitter(std::optional<std::shared_ptr<Transmitter>> holoscan::CudaStreamCondition::transmitter(
const std::string &port_name
)

Return the Transmitter corresponding to a specific output port of the Operator associated with this condition.

Returns: The Transmitter corresponding to the output port, if it exists. Otherwise, return nullopt.

Parameters

port_name
const std::string &

The name of the output port.

wrapper_cid

void holoscan::CudaStreamCondition::wrapper_cid(
int64_t cid
)

Store the component ID for this condition in the underlying backend implementation.

This method may not be needed for all backends.

Parameters

cid
int64_t

Component id corresponding to the underlying framework

notify_scheduler

bool holoscan::CudaStreamCondition::notify_scheduler()

Notify the scheduler that an asynchronous event has completed.

This method is used by event-based conditions (those returning kWaitEvent from check()) to signal to the scheduler that the condition is now ready to be re-evaluated.

This method can be called from any thread (e.g., a CUDA host callback or a worker thread). It is thread-safe.

Example usage:

Returns: true if the notification was successful, false otherwise.

Example

// In a CUDA host callback or worker thread
void my_callback(void* user_data) {
auto* condition = static_cast<MyCondition*>(user_data);
condition->state_.store(State::EVENT_COMPLETE);
condition->notify_scheduler();
}

See also: SchedulingStatusType::kWaitEvent

set_parameters

void holoscan::CudaStreamCondition::set_parameters() override

Set the parameters based on defaults (sets GXF parameters for GXF components).

id

int64_t holoscan::ComponentBase::id() const

Get the identifier of the component.

By default, the identifier is set to -1. It is set to a valid value when the component is initialized.

With the default executor (GXFExecutor), the identifier is set to the GXF component ID.

Returns: The identifier of the component.

args

std::vector<Arg> & holoscan::ComponentBase::args()

Get the list of arguments.

Returns: The vector of arguments.

description

std::string holoscan::ComponentBase::description() const

Get a description of the component.

Returns: YAML string.

See also: to_yaml_node()

service

template <typename ServiceT = DefaultFragmentService>
std::shared_ptr<ServiceT> holoscan::ComponentBase::service(
std::string_view id = ""
) const

Retrieve a registered fragment service or resource.

Retrieves a previously registered fragment service or resource by its type and optional identifier. Returns nullptr if no service/resource is found with the specified type and identifier.

Note that any changes to the service retrieval logic in this method should be synchronized with the implementation in Fragment::service() method to maintain consistency.

Returns: The shared pointer to the service/resource, or nullptr if not found or if type casting fails.

Template parameters

ServiceT
typename

The type of the service/resource to retrieve. Must inherit from either Resource or FragmentService. Defaults to DefaultFragmentService if not specified.

Parameters

id
std::string_viewDefaults to ""

The identifier of the service/resource. If empty, retrieves by type only.

get_service_by_type_info

std::shared_ptr<FragmentService> holoscan::ComponentBase::get_service_by_type_info(
const std::type_info &service_type,
std::string_view id = ""
) const

Retrieve a registered fragment service or resource for Python bindings.

This is a helper method for Python bindings to retrieve a service by its C++ type info.

Returns: The shared pointer to the base service, or nullptr if not found.

Parameters

service_type
const std::type_info &

The type info of the service/resource to retrieve.

id
std::string_viewDefaults to ""

The identifier of the service/resource. If empty, retrieves by type only.

reset_backend_objects

virtual void holoscan::ComponentBase::reset_backend_objects()

Reset any backend-specific objects (e.g. GXF GraphEntity).

cuda_host_callback

static void CUDART_CB holoscan::CudaStreamCondition::cuda_host_callback(
void *user_data
)

CUDA host callback function - called when GPU work on a stream completes.

register_callbacks_for_receivers

void holoscan::CudaStreamCondition::register_callbacks_for_receivers()

Register host callbacks for all streams found in messages across all receiver queues.

register_callbacks_for_single_receiver

void holoscan::CudaStreamCondition::register_callbacks_for_single_receiver(
nvidia::gxf::Receiver *gxf_recv,
gxf_context_t gxf_context,
size_t &streams_found
)

Register host callbacks for streams in a single receiver’s queue.

get_gxf_context

void * holoscan::CudaStreamCondition::get_gxf_context() const

Get the GXF context from the fragment.

get_all_receivers

std::vector<std::shared_ptr<Receiver>> holoscan::CudaStreamCondition::get_all_receivers() conststd::vector<std::shared_ptr<Receiver>> holoscan::CudaStreamCondition::get_all_receivers() const

Collect all receivers from the receivers_ parameter.

set_operator

void holoscan::CudaStreamCondition::set_operator(
Operator *op
)

Set the Operator this condition is associated with.

Parameters

op
Operator *

The pointer to the Operator object.

update_params_from_args

void holoscan::CudaStreamCondition::update_params_from_args()

Update parameters based on the specified arguments.

service_provider

void holoscan::ComponentBase::service_provider(
FragmentServiceProvider *provider
)

Set the service provider that owns this component.


Static methods

register_converter

template <typename typeT>
static void holoscan::ComponentBase::register_converter()

Register the argument setter for the given type.

If an operator or resource has an argument with a custom type, the argument setter must be registered using this method.

The argument setter is used to set the value of the argument from the YAML configuration.

This method can be called in the initialization phase of the operator/resource (e.g., initialize()). The example below shows how to register the argument setter for the custom type (Vec3):

It is assumed that YAML::convert<T>::encode and YAML::convert<T>::decode are implemented for the given type. You need to specialize the YAML::convert<> template class.

For example, suppose that you had a Vec3 class with the following members:

You can define the YAML::convert<Vec3> as follows in a ‘.cpp’ file:

Please refer to the yaml-cpp documentation for more details.

Template parameters

typeT
typename

The type of the argument to register.

Example

void MyOp::initialize() {
register_converter<Vec3>();
}

Example

struct Vec3 {
// make sure you have overloaded operator==() for the comparison
double x, y, z;
};

Example

namespace YAML {
template<>
struct convert<Vec3> {
static Node encode(const Vec3& rhs) {
Node node;
node.push_back(rhs.x);
node.push_back(rhs.y);
node.push_back(rhs.z);
return node;
}
static bool decode(const Node& node, Vec3& rhs) {
if(!node.IsSequence() || node.size() != 3) {
return false;
}
rhs.x = node[0].as<double>();
rhs.y = node[1].as<double>();
rhs.z = node[2].as<double>();
return true;
}
};
}

register_argument_setter

template <typename typeT>
void holoscan::ComponentBase::register_argument_setter()

Register the argument setter for the given type.

Please refer to the documentation of register_converter() for more details.

Template parameters

typeT
typename

The type of the argument to register.


Types

State

State machine states for the condition.

NameValueDescription
UNSETNo messages to process or condition was reset.
CALLBACKS_REGISTEREDHost callbacks registered, waiting for completion.
DATA_AVAILABLEAll callbacks fired, operator ready to execute.

ConditionComponentType

Resource type used for the initialization of the resource.

NameValueDescription
kNativeNative condition.
kGXFGXF condition (scheduling term).

Member variables

NameTypeDescription
receiver_Parameter< std::shared_ptr< Receiver > >Legacy single-receiver parameter.
receivers_Parameter< std::vector< std::shared_ptr< Receiver > > >
check_all_messages_Parameter< bool >
state_std::atomic< State >
pending_callbacks_std::atomic< size_t >
last_state_change_int64_t
is_initialized_boolWhether the condition is initialized.
resources_std::unordered_map< std::string, std::shared_ptr< Resource > >The resources used by the condition.
condition_type_ConditionComponentTypeThe type of the component.
op_Operator *The operator this condition is associated with.
wrapper_cid_int64_tComponent ID of underlying GXFSchedulingTermWrapper.
spec_std::shared_ptr< ComponentSpec >The component specification.
id_int64_tThe ID of the component.
name_std::stringName of the component.
fragment_Fragment *Pointer to the fragment that owns this component.
args_std::vector< Arg >List of arguments.
service_provider_FragmentServiceProvider *Pointer to the service provider.

Inner classes

CallbackData

struct holoscan::CudaStreamCondition::CallbackData

Data passed to CUDA host callbacks.

NameTypeDescription
conditionCudaStreamCondition *