Create a Condition

View as Markdown

In most cases, applications will be built using one of several provided conditions documented in the condition components section of this user guide. This page illustrates the advanced use case of adding a user-defined condition to control when an operator can execute.

C++ Conditions

When assembling a C++ application, two types of conditions can be used:

  1. Native C++ conditions: custom conditions defined in C++ without using the GXF API, by creating a subclass of holoscan::Condition.
  2. GXF Conditions: conditions defined in the underlying C++ library by inheriting from the holoscan::ops::GXFCondition (holoscan::GXFCondition) class. These conditions wrap GXF scheduling term components from GXF extensions. Examples are holoscan::CountCondition for limiting operator execution to a specified count and holoscan::PeriodicCondition for restricting the rate of execution of an operator to a specified period. Several additional built-in conditions are documented in the condition components section.

It is possible to assign a mixture of GXF conditions and native conditions to an operator.

Native Conditions

Understanding operator scheduling

The holoscan::SchedulingStatusType enum defines the current status of the condition.

Condition Scheduling StatusDescription
kNeverOperator will never execute again
kReadyOperator is ready for execution
kWaitOperator may execute in the future
kWaitTimeOperator will be ready for execution after specified duration
kWaitEventOperator is waiting on an asynchronous event with unknown time interval

The overall readiness of an operator to execute will be determined by AND combination of the status of all of the individual conditions present on an operator. In other words, the operator will only be able to execute once all conditions are in a kReady state. If any condition is in kNever state, the operator will never execute again.

When multiple operators are ready to execute at the same time, the order in which they execute will depend on the specific holoscan::Scheduler being used by the application. For example, the holoscan::GreedyScheduler executes one operator at a time in a fixed, deterministic order while the holoscan::EventBasedScheduler and holoscan::MultiThreadScheduler can have multiple worker threads that allow operators to execute in parallel.

When using kWaitEvent / WAIT_EVENT, your condition must call notify_scheduler() (C++ (holoscan::Condition::notify_scheduler)/Python (holoscan.core.Condition.notify_scheduler)) when the asynchronous event completes to wake the scheduler. This is the mechanism used by CudaStreamCondition and AsynchronousCondition. See the Event Based Conditions section below for details on implementing event-based conditions.

Creating a custom condition (C++)

When creating a native Condition (C++ (holoscan::Condition)/Python (holoscan.core.Condition)), one will typically need to override the following base component class methods

  • initialize (C++ (holoscan::Condition::initialize)/Python (holoscan.core.Condition.initialize)) is called once during initialization after the applications run (C++ (holoscan::Application::run)/Python (holoscan.core.Application.run)) method is called. This can be used to setup any initial status for the member variables defined for the condition. It is important that this method call the base initialize (C++ (holoscan::Condition::initialize)/Python (holoscan.core.Condition.initialize)) method prior to using any parameters defined by setup (C++ (holoscan::Condition::setup)/Python (holoscan.core.Condition.setup)).
  • setup (C++ (holoscan::Condition::setup)/Python (holoscan.core.Condition.setup)) This method is used to configure any parameters defined for the condition. This method will be called automatically by the Application (C++ (holoscan::Application)/Python (holoscan.core.Application)) class when its run (C++ (holoscan::Application::run)/Python (holoscan.core.Application.run)) method is called.

It is also required to override the following three methods that will be used by the underlying GXF scheduler. Of these, the check method is the only one that is always required to have a non-empty implementation.

  • check (C++ (holoscan::Condition::check)/Python (holoscan.core.Condition.check)) is called by the underlying GXF scheduler in order to check whether the operator to which this condition is assigned is ready to execute. The operator will only execute when this check sets the type output argument to holoscan::SchedulingStatusType::kReady (C++) / holoscan.core.SchedulingStatusType.READY (Python).
  • on_execute (C++ (holoscan::Condition::on_execute)/Python (holoscan.core.Condition.on_execute)) is called immediately after an operator’s compute method (C++ (holoscan::Operator::compute)/Python (holoscan.core.Operator.compute)), just before any emitted messages are actually distributed to downstream receivers.
  • update_state (C++ (holoscan::Condition::update_state)/Python (holoscan.core.Condition.update_state)) is always called immediately before check (C++ (holoscan::Condition::check)/Python (holoscan.core.Condition.check)) and is always passed the current timestamp as an input argument. This is used by operator whose status depends on the current timestamp.

To create a custom condition in C++, it is necessary to create a subclass of Condition (C++ (holoscan::Condition)/Python (holoscan.core.Condition)). The following example demonstrates how to use native conditions (conditions that do not have an underlying, pre-compiled GXF SchedulingTerm).

Code Snippet: examples/conditions/native/cpp/ping_periodic_native.cpp

1 #include <optional>
2  
3 #include &lt;holoscan/holoscan.hpp&gt;
4 #include &lt;holoscan/operators/ping_rx/ping_rx.hpp&gt;
5 #include &lt;holoscan/operators/ping_tx/ping_tx.hpp&gt;
6  
7 namespace holoscan::conditions {
8  
9 class NativePeriodicCondition : public Condition {
10 public:
11 HOLOSCAN_CONDITION_FORWARD_ARGS(NativePeriodicCondition)
12  
13 NativePeriodicCondition() = default;
14  
15 void initialize() override {
16 // call parent initialize or parameters will not be registered
17 Condition::initialize();
18 recess_period_ns_ = recess_period_.get();
19 };
20  
21 void setup(ComponentSpec& spec) override {
22 spec.param(recess_period_,
23 "recess_period",
24 "Recess Period",
25 "Recession period in nanoseconds",
26 static_cast<int64_t>(0));
27 }
28  
29 void check(int64_t timestamp, SchedulingStatusType* status_type,
30 int64_t* target_timestamp) const override {
31 if (status_type == nullptr) {
32 throw std::runtime_error(
33 fmt::format("Condition '{}' received nullptr for status_type", name()));
34 }
35 if (target_timestamp == nullptr) {
36 throw std::runtime_error(
37 fmt::format("Condition '{}' received nullptr for target_timestamp", name()));
38 }
39 if (!next_target_.has_value()) {
40 *status_type = SchedulingStatusType::kReady;
41 *target_timestamp = timestamp;
42 return;
43 }
44 *target_timestamp = next_target_.value();
45 *status_type = timestamp > *target_timestamp ? SchedulingStatusType::kReady
46 : SchedulingStatusType::kWaitTime;
47 };
48  
49 void on_execute(int64_t timestamp) override {
50 if (next_target_.has_value()) {
51 next_target_ = next_target_.value() + recess_period_ns_;
52 } else {
53 next_target_ = timestamp + recess_period_ns_;
54 }
55 };
56  
57 void update_state([[maybe_unused]] int64_t timestamp) override {
58 // no-op for this condition
59 };
60  
61 private:
62 Parameter<int64_t> recess_period_;
63  
64 int64_t recess_period_ns_ = 0;
65 std::optional<int64_t> next_target_ = std::nullopt;
66 };
67  
68 } // namespace holoscan::conditions
69  
70 class App : public holoscan::Application {
71 public:
72 void compose() override {
73 using namespace holoscan;
74  
75 auto tx = make_operator&lt;ops::PingTxOp&gt;(
76 "tx",
77 make_condition<CountCondition>("count-condition", 5),
78 make_condition&lt;conditions::NativePeriodicCondition&gt;(
79 "dummy-condition", Arg("recess_period", static_cast<int64_t>(200'000'000))));
80  
81 auto rx = make_operator&lt;ops::PingRxOp&gt;("rx");
82  
83 add_flow(tx, rx);
84 }
85 };
86  
87 int main([[maybe_unused]] int argc, char** argv) {
88 auto app = holoscan::make_application<App>();
89  
90 app->run();
91  
92 return 0;
93 }

In this application, two operators are created: PingTxOp (C++ (holoscan::Operators::PingTxOp) / Python (holoscan.operators.PingRxOp)).

  1. The tx operator is a source operator that emits an integer value each time it is evoked.
  2. The rx operator is a sink operator that receives one value from the tx operator.

One custom condition, NativePeriodicCondition is created by inheriting from the Condition (C++ (holoscan::Condition)/Python (holoscan.core.Condition)) class. This condition is a simplified version of the provided PeriodicCondition (C++ (holoscan::PeriodicCondition)/Python (holoscan.conditions.PeriodicCondition)) (it does not implement the policy argument and only accepts an integer valued recess_period). We only create this condition to have a simple case to illustrate how a custom condition can be created.

The setup method of NativePeriodicCondition defines a single parameter named “recess_period”, which represents the amount of time in nanoseconds that an operator will have to wait after executing before it can execute again.

In defining the initialize method, note that we start by calling initialize (C++ (holoscan::Condition::initialize)/Python (holoscan.core.Condition.initialize)) so that we can get the value for the built in “recess_period” parameter. This initialize method then sets the initial state of the private member variables for this operator.

The check method is implemented to set type to SchedulingStatusType::kReady (C++) / SchedulingStatusType.READY (Python) if the specified period has elapsed. Otherwise, it sets the target_timestamp and sets type to SchedulingStatusType::kWaitTime (C++) / SchedulingStatusType.WAIT_TIME (Python). In this case kWaitTime is used because we know specifically what the target timestamp is. Note that for other types of conditions, we may not know the specific time at which the condition will be satisfied. In such a case where a target timestamp isn’t known, one should instead set the status to kWait (C++) / WAIT (Python) and would not need to set target_timestamp. There is also a kWaitEvent (C++) / WAIT_EVENT (Python) state which can be used for event-based conditions where the target timestamp isn’t known in advance. Built-in conditions such as AsynchronousCondition and CudaStreamCondition use this status type. Finally, if we wanted to indicate that an operator would never execute again, we would return kNever (C++) / NEVER (Python) (a concrete example that uses never is the CountCondition (C++ (holoscan::CountCondition)/Python (holoscan.core.CountCondition)) which sets that state once the specified count has been reached).

The on_execute method sets the internal next_target_ timestamp to the timestamp passed in. Because the underlying GXF framework calls this method immediately after Operator::compute, this is setting the period waited by this condition to be from the time when the prior call to compute completed.

The update_state method was not needed for this operator. This method is always called immediately prior to check and sometimes conditions choose to call it from the on_execute method. It is intended to perform some update of the internal state of the condition based on the timestamp at the time it is called.

For the C++ API, we can construct a shared pointer to an instance of the condition using the make_condition method. That condition can then be passed to the make_operator method for the operator the condition will apply to (in this case PingTxOp). For the Python API, we instead directly pass the constructed NativePeriodicCondition as a positional argument to the tx operator.

Condition Evaluation Timing Diagram

To better understand when a condition’s check, update_state and on_execute methods would be called by the underlying GXF entity executor, please see the following diagram.

Fragment graph with a cycle and an implicit root operator

It can be seen that when checking if an operator is ready to execute the Condition::update_state method will be called immediately before Condition::check. If the check was successful (across the combination of all conditions on the operator), then the compute method would be called for that operator. The Condition::on_execute method is only called once compute completes.

Creating a custom condition involving transmitter or receiver queues

Some condition types depend on the state of the receiver or transmitter queues corresponding to an input or output port of an operator. This type of condition can also be created as a native condition. An illustration of this for an equivalent of MessageAvailableCondition is given in a second example. See C++: examples/conditions/native/cpp/message_available_native.cpp, Python: examples/conditions/native/python/message_available_native.py.

The primary additional consideration in designing such a message-based condition is how to retrieve the Receiver (C++ (holoscan::Receiver)/Python (holoscan.resources.Receiver)) or Transmitter (C++ (holoscan::Transmitter)/Python (holoscan.resources.Transmitter)) object for use in the native Condition’s methods.

In the case of a native C++ condition, a parameter of type Parameter<std::shared_ptr<holoscan::Receiver>> receiver_ should be defined as shown on lines 85-86 and 37-43. Methods to query the queue size can then be used as demonstrated for the check_min_size method on lines 80-83.

Alternatively, you can use the receiver() method inherited from Condition to retrieve a Receiver by port name in initialize(), similar to the Python example below.

Code Snippet: examples/conditions/native/cpp/message_available_native.cpp

1 class NativeMessageAvailableCondition : public Condition {
2 public:
3 HOLOSCAN_CONDITION_FORWARD_ARGS(NativeMessageAvailableCondition)
4  
5 NativeMessageAvailableCondition() = default;
6  
7 void initialize() override {
8 // call parent initialize or parameters will not be registered
9 Condition::initialize();
10 };
11  
12 void setup(ComponentSpec& spec) override {
13 spec.param(receiver_,
14 "receiver",
15 "Receiver",
16 "The scheduling term permits execution if this channel has at least a given "
17 "number of messages available.");
18 spec.param(min_size_,
19 "min_size",
20 "Minimum size",
21 "The condition permits execution if the given receiver has at least the given "
22 "number of messages available",
23 static_cast<uint64_t>(1));
24 }
25  
26 void check(int64_t timestamp, SchedulingStatusType* type,
27 int64_t* target_timestamp) const override {
28 if (type == nullptr) {
29 throw std::runtime_error(fmt::format("Condition '{}' received nullptr for type", name()));
30 }
31 if (target_timestamp == nullptr) {
32 throw std::runtime_error(
33 fmt::format("Condition '{}' received nullptr for target_timestamp", name()));
34 }
35 *type = current_state_;
36 *target_timestamp = last_state_change_;
37 };
38  
39 void on_execute(int64_t timestamp) override { update_state(timestamp); };
40  
41 void update_state(int64_t timestamp) override {
42 const bool is_ready = check_min_size();
43 if (is_ready && current_state_ != SchedulingStatusType::kReady) {
44 current_state_ = SchedulingStatusType::kReady;
45 last_state_change_ = timestamp;
46 }
47  
48 if (!is_ready && current_state_ != SchedulingStatusType::kWait) {
49 current_state_ = SchedulingStatusType::kWait;
50 last_state_change_ = timestamp;
51 }
52 };
53  
54 private:
55 bool check_min_size() {
56 auto recv = receiver_.get();
57 return recv->back_size() + recv->size() >= min_size_.get();
58 }
59  
60 Parameter<std::shared_ptr&lt;holoscan::Receiver&gt;> receiver_;
61 Parameter<uint64_t> min_size_;
62  
63 SchedulingStatusType current_state_ =
64 SchedulingStatusType::kWait; // The current state of the scheduling term
65 int64_t last_state_change_ = 0; // timestamp when the state changed the last time
66 };

Creating event-based conditions (kWaitEvent)

For conditions that wait on asynchronous events (such as CUDA stream completion or external callbacks), use the kWaitEvent / WAIT_EVENT status. This tells the scheduler that the condition will be satisfied at some unknown future time when an external event occurs.

Key requirements for event-based conditions:

  1. Return kWaitEvent from check(): When waiting for an async event to complete
  2. Call notify_scheduler(): When the event completes, to wake the scheduler
  3. Use atomic state variables: For thread-safe updates from async callbacks
  4. Reset state in on_execute(): Prepare for the next scheduling cycle

The typical state machine pattern is:

IDLE --[update_state: work available]--> WAITING --[callback fires]--> READY --[on_execute]--> IDLE
register async callback notify_scheduler() operator runs

For a complete, production-quality example of an event-based condition, see the CudaStreamCondition implementation in the SDK source code:

  • Header: include/holoscan/core/conditions/gxf/cuda_stream.hpp
  • Implementation: src/core/conditions/gxf/cuda_stream.cpp

This condition demonstrates:

  • Using cudaLaunchHostFunc() to register callbacks on CUDA streams
  • Atomic state management with std::atomic<State>
  • Proper use of notify_scheduler() from the callback
  • Handling multiple streams and messages
1// Key pattern from CudaStreamCondition:
2 
3// 1. check() returns status based on current state
4void check(int64_t timestamp, SchedulingStatusType* type, int64_t* target_timestamp) const override {
5 switch (state_.load()) {
6 case State::DATA_AVAILABLE:
7 *type = SchedulingStatusType::kReady;
8 break;
9 case State::CALLBACKS_REGISTERED:
10 *type = SchedulingStatusType::kWaitEvent; // Waiting for GPU work
11 break;
12 default:
13 *type = SchedulingStatusType::kWait;
14 break;
15 }
16 *target_timestamp = last_state_change_;
17}
18 
19// 2. CUDA host callback fires when GPU work completes
20static void CUDART_CB cuda_host_callback(void* user_data) {
21 auto* self = reinterpret_cast<CallbackData*>(user_data)->condition;
22 if (self->pending_callbacks_.fetch_sub(1) - 1 == 0) {
23 self->state_.store(State::DATA_AVAILABLE);
24 self->notify_scheduler(); // Wake the scheduler
25 }
26}
27 
28// 3. on_execute() resets state after operator runs
29void on_execute(int64_t timestamp) override {
30 state_.store(State::UNSET);
31 pending_callbacks_.store(0);
32}

For Python, the same pattern applies using threading.Lock for thread safety instead of atomics. See the C++ implementation as a reference for the overall structure.

The built-in holoscan::CudaStreamCondition uses this pattern with cudaLaunchHostFunc() to register a CUDA host callback that fires when GPU work on a stream completes. See the CUDA stream handling documentation for more details.

Override behavior for default Operator port conditions

The section on customizing input and output ports, explains that when a user adds a port without specifying any condition in Operator::setup, a default one is added. This default is a MessageAvailableCondition for input ports or a DownstreamMessageAffordableCondition for output ports).

If the user has supplied their own Condition to make_operator (C++) or as a positional argument to the operator constructor (Python) and that condition has a “receiver” or “transmitter” argument corresponding to an Operator port name, then a default condition should not be added to that port. This is done to avoid having multiple, potentially conflicting conditions on the same port. However, if the Operator::setup method explicitly specifies a condition via a call to IOSpec::condition, then that explicit condition would still be added to the port in addition to any other user-supplied one.

Creating Python bindings for a custom C++ condition

To expose a custom C++ condition to Python, it can be wrapped using pybind11 just like any other Condition class. For several examples, see the bindings for built-in conditions. The only difference for binding a custom condition vs. the examples in that folder is that custom conditions should use holoscan::Condition instead of holoscan::gxf::GXFCondition in the list of classes passed to the py::class_ call.