Dynamic Flow Control
Dynamic Flow Control is a feature introduced in Holoscan SDK v3.0 that allows operators to modify their connections with other operators at runtime. This enables creating complex workflows with conditional branching, loops, and dynamic routing patterns.
Traditional static workflows in Holoscan define fixed connections between operators at application composition time. Dynamic Flow Control extends this by allowing operators to:
Modify their connections during execution
Route data conditionally to different operators
Create loops and iterative patterns
Implement complex branching logic
Common use cases include:
Conditional processing pipelines
Adaptive workflow routing
Iterative processing with dynamic termination
Error handling and recovery flows
Here’s a simple example to get started with Dynamic Flow Control in Holoscan:
1. Basic Sequential Flow
The simplest use of dynamic flow control is creating a sequential chain of operators that doesn’t have any input or output ports:
#include <holoscan/holoscan.hpp>
// Define a simple operator
class SimpleOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(SimpleOp)
SimpleOp() = default;
void compute(holoscan::InputContext&, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
// Simple computation
HOLOSCAN_LOG_INFO("Executing {}", name());
}
};
// Define the application
class SimpleSequentialApp : public holoscan::Application {
public:
void compose() override {
// Create operators
auto op1 = make_operator<SimpleOp>("op1");
auto op2 = make_operator<SimpleOp>("op2");
// Connect operators sequentially
add_flow(start_op(), op1); // Start with op1
add_flow(op1, op2); // Then op2
}
};
int main(int argc, char** argv) {
auto app = holoscan::make_application<SimpleSequentialApp>();
app->run();
return 0;
}
from holoscan.core import Application, Operator
class SimpleOp(Operator):
def compute(self, op_input, op_output, context):
# Simple computation
print(f"Executing{self.name}")
class SimpleSequentialApp(Application):
def compose(self):
# Create operators
op1 = SimpleOp(self, name="op1")
op2 = SimpleOp(self, name="op2")
# Connect operators sequentially
self.add_flow(self.start_op(), op1) # Start with op1
self.add_flow(op1, op2) # Then op2
def main():
app = SimpleSequentialApp()
app.run()
if __name__ == "__main__":
main()
2. Basic Conditional Flow
Here’s a simple example of conditional routing between operators:
#include <holoscan/holoscan.hpp>
// Define a simple operator with a value
class SimpleOp : public holoscan::Operator {
public:
HOLOSCAN_OPERATOR_FORWARD_ARGS(SimpleOp)
SimpleOp() = default;
void setup(holoscan::OperatorSpec& spec) override {}
void compute(holoscan::InputContext&, holoscan::OutputContext&,
holoscan::ExecutionContext&) override {
value_++; // Increment value each time
HOLOSCAN_LOG_INFO("Executing {} with value {}", name(), value_);
}
int get_value() const { return value_; }
private:
int value_ = 0;
};
// Define the application
class SimpleConditionalApp : public holoscan::Application {
public:
void compose() override {
// Create operators
auto op1 = make_operator<SimpleOp>("op1", make_condition<holoscan::CountCondition>(3));
auto path_a = make_operator<SimpleOp>("path_a");
auto path_b = make_operator<SimpleOp>("path_b");
// Define possible flows
add_flow(op1, path_a);
add_flow(op1, path_b);
// Set dynamic flow based on condition
set_dynamic_flows(op1, [path_a, path_b](const std::shared_ptr<holoscan::Operator>& op) {
auto simple_op = std::static_pointer_cast<SimpleOp>(op);
if (simple_op->get_value() % 2 == 0) {
simple_op->add_dynamic_flow(path_a); // Even values go to path_a
} else {
simple_op->add_dynamic_flow(path_b); // Odd values go to path_b
}
});
}
};
int main(int argc, char** argv) {
auto app = holoscan::make_application<SimpleConditionalApp>();
app->run();
return 0;
}
from holoscan.conditions import CountCondition
from holoscan.core import Application, Operator
class SimpleOp(Operator):
def __init__(self, *args, **kwargs):
self.value = 0
super().__init__(*args, **kwargs)
def compute(self, op_input, op_output, context):
self.value += 1 # Increment value each time
print(f"Executing{self.name}with value{self.value}")
class SimpleConditionalApp(Application):
def compose(self):
# Create operators
op1 = SimpleOp(self, CountCondition(self, count=3), name="op1")
path_a = SimpleOp(self, name="path_a")
path_b = SimpleOp(self, name="path_b")
# Define possible flows
self.add_flow(op1, path_a)
self.add_flow(op1, path_b)
# Set dynamic flow based on condition
def route_flow(op):
if op.value % 2 == 0:
op.add_dynamic_flow(path_a) # Even values go to path_a
else:
op.add_dynamic_flow(path_b) # Odd values go to path_b
self.set_dynamic_flows(op1, route_flow)
def main():
app = SimpleConditionalApp()
app.run()
if __name__ == "__main__":
main()
Key Points to Remember:
Optionally use
start_op()(
C++/
Python) to get the initial operator in your flow
Connect operators with
add_flow()
Use
set_dynamic_flows()(
C++/
Python) to define runtime routing logic. An identically named method is also available on
Subgraph(
C++/
Python) for applications making use of subgraphs containing dynamic flows.
Implement flow control logic in the callback function passed to
set_dynamic_flows()(
C++/
Python)
The callback function takes an operator as input and returns void
The callback function can add dynamic flows using the operator’s
add_dynamic_flow()(
C++/
Python) methods
For more complex patterns and detailed explanations, see the sections below.
Input and Output Execution Ports
Before Holoscan SDK v3.0, operators needed input and output ports to be connected via
add_flow() and there is no way to specify the execution dependency if the operator does not have any input or output ports.
However, in some cases, the requirements were different:
An ‘execution order dependency’ was needed instead of a ‘data flow dependency’.
Execution control was required rather than keeping a node running continuously.
The pipeline should run only once unless explicitly specified to loop.
To address these needs, Holoscan SDK v3.0 introduced implicit input/output ‘execution ports’ (
__input_exec__ /
__output_exec__), inspired by Unreal Engine’s Blueprints (particularly execution pins).
The output execution port (
__output_exec__.
holoscan::Operator::kOutputExecPortName in C++ and
holoscan.core.Operator.OUTPUT_EXEC_PORT_NAME in Python) of a source operator and the input execution port (
__input_exec__,
holoscan::Operator::kInputExecPortName in C++ and
holoscan.core.Operator.INPUT_EXEC_PORT_NAME in Python) of a target operator are implicitly added when both of the following are true:
Two operators are connected using
add_flow()without specifying a port map.
The target operator does not have an explicit input port.
Both the source and target operators must be native Holoscan operators. Attempting to connect a GXF Operator to a native Holoscan operator, or vice versa, with an empty port map will result in an error.
During execution, after the source operator’s
compute() method is called, the Holoscan executor emits an empty message (
Entity) to the implicit output execution port as a signal. This can then trigger the target operator’s execution, as the Holoscan executor attaches a
MessageAvailableCondition to the target operator’s implicit input execution port. Before the target operator’s
compute() method runs, the executor collects (pops) all messages from the implicit input execution port, enabling execution dependencies without requiring explicit input and output execution ports.
Starting with Holoscan SDK v3.0, operators can be connected via
add_flow() without the need for explicit input and output execution ports, allowing for more flexible and dynamic operator connections.
These execution port concepts also apply to Subgraph interface ports. Subgraphs can expose execution interface ports using
add_input_exec_interface_port() and
add_output_exec_interface_port(), allowing Subgraphs to participate in dynamic flow control just like operators. See the Subgraph with Dynamic Flow Control section below for details.
Start Operator
In Holoscan, when the workflow graph is executed, root operators who do not have any input ports are first executed, and unless any condition is specified to the root operator (such as
CountCondition or
PeriodicCondition), it will execute continuously.
Inspired by LangGraph’s start node (langgraph.graph.START), Holoscan SDK v3.0 introduces a new concept of the start operator.
The start operator is the first operator in an application fragment, serving as the entry point to the workflow. It is simply the first operator added to the fragment.
This operator is named
<|start|> and has a condition of
CountCondition(1), ensuring it executes only once. Other entry operators that initiate fragment execution should connect to this operator.
In Holoscan, you can retrieve the start operator by calling
start_op() (
C++/
Python) within the
compose() method. If this method is called multiple times, it will return the same start operator
This API is available in both C++ and Python (see flow_control/sequential for an example):
class SequentialExecutionApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the operators
auto node1 = make_operator<SimpleOp>("node1");
auto node2 = make_operator<SimpleOp>("node2");
auto node3 = make_operator<SimpleOp>("node3");
// Define the three-operator workflow
add_flow(start_op(), node1);
add_flow(node1, node2);
add_flow(node2, node3);
}
};
class SequentialExecutionApp(Application):
def compose(self):
# Define the operators
node1 = SimpleOp(self, name="node1")
node2 = SimpleOp(self, name="node2")
node3 = SimpleOp(self, name="node3")
# Define the three-operator workflow
self.add_flow(self.start_op(), node1)
self.add_flow(node1, node2)
self.add_flow(node2, node3)
In this example, the start operator is connected to
node1, making
node1 the first operator to execute. Since the start operator has a condition of
CountCondition(1), it will only trigger once, ensuring
node1 runs a single time.
In this example, each node (operator) is executed sequentially and executed only once. The
start_op() method retrieves the start operator, which is connected to
node1. This makes
node1 the first operator to execute. After
node1 completes its execution,
node2 is triggered, followed by
node3. The
CountCondition(1) in the
start_op() method ensures that each operator in the sequence runs a single time, maintaining a clear and predictable flow of execution.
Setting Dynamic Flows
The
set_dynamic_flows() (
C++/
Python) method allows for dynamic flow control in a Holoscan application. This method sets a callback function that determines the flow of execution based on the state of the operator at runtime.
In the example from flow_control/conditional, the
set_dynamic_flows() method is used to dynamically control the flow between
node1,
node2, and
node4 based on the value of
node1. The callback function checks the value of
node1 and adds a dynamic flow to either
node2 or
node4:
Node Graph:
node1 (launch twice)
/ \
node2 node4
| |
node3 node5
class ConditionalExecutionApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the operators
auto node1 = make_operator<SimpleOp>("node1", make_condition<CountCondition>(2));
auto node2 = make_operator<SimpleOp>("node2");
auto node3 = make_operator<SimpleOp>("node3");
auto node4 = make_operator<SimpleOp>("node4");
auto node5 = make_operator<SimpleOp>("node5");
add_flow(node1, node2);
add_flow(node2, node3);
add_flow(node1, node4);
add_flow(node4, node5);
set_dynamic_flows(node1, [node2, node4](const std::shared_ptr<Operator>& op) {
auto simple_op = std::static_pointer_cast<SimpleOp>(op);
if (simple_op->get_value() % 2 == 1) {
simple_op->add_dynamic_flow(node2);
} else {
simple_op->add_dynamic_flow(node4);
}
});
}
};
class ConditionalExecutionApp(Application):
def compose(self):
# Define the operators
node1 = SimpleOp(self, CountCondition(self, count=2), name="node1")
node2 = SimpleOp(self, name="node2")
node3 = SimpleOp(self, name="node3")
node4 = SimpleOp(self, name="node4")
node5 = SimpleOp(self, name="node5")
self.add_flow(node1, node2)
self.add_flow(node2, node3)
self.add_flow(node1, node4)
self.add_flow(node4, node5)
def dynamic_flow_callback(op):
if op.value % 2 == 1:
op.add_dynamic_flow(node2)
else:
op.add_dynamic_flow(node4)
self.set_dynamic_flows(node1, dynamic_flow_callback)
In the above example, the
set_dynamic_flows() (
C++/
Python) methods are used to define and manage dynamic workflows in the application.
The
set_dynamic_flows() (
C++/
Python) method takes an operator and a callback function as arguments. The callback function is called with the operator as an argument.
Inside the callback function, the
add_dynamic_flow() (
C++/
Python) method is used to add a dynamic flow to the operator.
In the example, the callback function checks the value of
node1 and adds a dynamic flow to either
node2 or
node4.
The
add_dynamic_flow() (
C++/
Python) method has several overloads to support different ways of adding dynamic flows:
/// Basic connection using default output port. This is the simplest form for connecting
/// two operators when you only need to specify the destination.
void add_dynamic_flow(const std::shared_ptr<Operator>& next_op,
const std::string& next_input_port = "");
/// Connection with explicit output port specification. Use this when the source operator
/// has multiple output ports and you need to specify which one to use.
void add_dynamic_flow(const std::string& curr_output_port,
const std::shared_ptr<Operator>& next_op,
const std::string& next_input_port = "");
/// Connection using a FlowInfo object, which encapsulates all connection details including:
/// - Source operator and its output port specification
/// - Destination operator and its input port specification
/// - Port names and associated IOSpecs
void add_dynamic_flow(const std::shared_ptr<FlowInfo>& flow);
/// Batch connection using multiple FlowInfo objects. Use this to set up multiple
/// connections in a single call, which is more efficient than making multiple
/// individual connections.
void add_dynamic_flow(const std::vector<std::shared_ptr<FlowInfo>>& flows);
# 1. Basic connection using default output port. This is the simplest form for connecting
# two operators when you only need to specify the destination.
op.add_dynamic_flow(next_op: Operator, next_input_port_name: str = '')
# 2. Connection with explicit output port specification. Use this when the source operator
# has multiple output ports and you need to specify which one to use.
op.add_dynamic_flow(curr_output_port_name: str, next_op: Operator, next_input_port_name: str = '')
# 3. Connection using a FlowInfo object, which encapsulates all connection details including:
# - Source operator and its output port specification
# - Destination operator and its input port specification
# - Port names and associated IOSpecs
#
# This is useful for complex connections or when reusing connection patterns.
op.add_dynamic_flow(flow: FlowInfo)
# 4. Batch connection using multiple FlowInfo objects. Use this to set up multiple
# connections in a single call, which is more efficient than making multiple
# individual connections.
op.add_dynamic_flow(flows: list[FlowInfo])
The simple form of
add_dynamic_flow() (
C++/
Python) is passing just the next operator (and optionally the next input port name).
If the next operator does not have any explicit input, you can omit the next input port name. In this case, current operator’s implicit output execution port will be connected to the next operator’s implicit input execution port.
Flow Information
The
FlowInfo (
C++/
Python) class represents information about a connection between operators and takes the following arguments in the constructor:
curr_operator: The source operator of the flow connection
curr_output_port: The name of the output port on the source operator
next_operator: The destination operator of the flow connection
next_input_port: The name of the input port on the destination operator
Inside the callback function, you can use the
find_flow_info() (
C++/
Python) method and
find_all_flow_info() (
C++/
Python) method to find the
FlowInfo object(s) that matches the predicate.
The following example shows how to find the
FlowInfo object(s) that matches the predicate:
class ConditionalExecutionApp : public holoscan::Application {
public:
void compose() override {
using namespace holoscan;
// Define the operators
auto node1 = make_operator<SimpleOp>("node1", make_condition<CountCondition>(2));
auto node2 = make_operator<SimpleOp>("node2");
auto node3 = make_operator<SimpleOp>("node3");
auto node4 = make_operator<SimpleOp>("node4");
auto node5 = make_operator<SimpleOp>("node5");
add_flow(node1, node2);
add_flow(node2, node3);
add_flow(node1, node4);
add_flow(node4, node5);
// // If you want to add all the next flows, you can use the following code:
// set_dynamic_flows(
// node1, [](const std::shared_ptr<Operator>& op) { op->add_dynamic_flow(op->next_flows());
// });
set_dynamic_flows(node1, [](const std::shared_ptr<Operator>& op) {
auto simple_op = std::static_pointer_cast<SimpleOp>(op);
static const auto& node2_flow = op->find_flow_info(
[](const auto& flow) { return flow->next_operator->name() == "node2"; });
static const auto& node4_flow = op->find_flow_info(
[](const auto& flow) { return flow->next_operator->name() == "node4"; });
//static const auto& all_next_flows = op->find_all_flow_info(
// [](const auto& flow) { return true; });
//std::cout << "All next flows: ";
//for (const auto& flow : all_next_flows) {
// std::cout << flow->next_operator->name() << " ";
//}
//std::cout << std::endl;
if (simple_op->get_value() % 2 == 1) {
simple_op->add_dynamic_flow(node2_flow);
} else {
simple_op->add_dynamic_flow(node4_flow);
}
});
}
};
class ConditionalExecutionApp(Application):
def compose(self):
# Define the operators
node1 = SimpleOp(self, CountCondition(self, count=2), name="node1")
node2 = SimpleOp(self, name="node2")
node3 = SimpleOp(self, name="node3")
node4 = SimpleOp(self, name="node4")
node5 = SimpleOp(self, name="node5")
self.add_flow(node1, node2)
self.add_flow(node2, node3)
self.add_flow(node1, node4)
self.add_flow(node4, node5)
# # If you want to add all the next flows, you can use the following code:
# self.set_dynamic_flows(node1, lambda op: op.add_dynamic_flow(op.next_flows))
# This is another way to add dynamic flows based on the next operator name
def dynamic_flow_callback(op):
node2_flow = op.find_flow_info(lambda flow: flow.next_operator.name == "node2")
node4_flow = op.find_flow_info(lambda flow: flow.next_operator.name == "node4")
# all_next_flows = op.find_all_flow_info(lambda flow: True)
# print(f"All next flows: {[flow.next_operator.name for flow in all_next_flows]}")
if op.value % 2 == 1:
op.add_dynamic_flow(node2_flow)
else:
op.add_dynamic_flow(node4_flow)
self.set_dynamic_flows(node1, dynamic_flow_callback)
In the above example, instead of using
op.add_dynamic_flow(node2) or
op.add_dynamic_flow(node4), we use
op.add_dynamic_flow(node2_flow) or
op.add_dynamic_flow(node4_flow).
And the
node2_flow and
node4_flow are
FlowInfo objects that are found using the
find_flow_info() method.
The
find_flow_info() (
C++/
Python) method takes a predicate as an argument and returns a
FlowInfo object that matches the predicate.
The
find_all_flow_info() (
C++/
Python) method takes a predicate as an argument and returns a vector (list) of
FlowInfo objects that match the predicate.
If you want to get a vector of all the next flows, you can use
op->next_flows() in C++ or
op.next_flows in Python.
Using Dynamic Flow Control with Subgraphs
Subgraphs can expose execution interface ports that work seamlessly with dynamic flow control features. This allows you to encapsulate sequences of operators and treat them as single units in your control flow.
Subgraphs support both data and execution interface ports:
Data interface ports: Expose operator data input/output ports using
add_input_interface_port()and
add_output_interface_port()
Execution interface ports: Expose operator execution ports using
add_input_exec_interface_port()and
add_output_exec_interface_port()
When connecting Subgraphs in control flow:
If a Subgraph has only one execution interface port (input or output),
add_flow()will automatically resolve to that port
Subgraphs can be used with
start_op(), just like regular operators
Dynamic flow routing works with Subgraphs through their interface ports
Nested Subgraphs are supported, allowing hierarchical control flow composition
For a complete example, see sequential_with_subgraph, which demonstrates a workflow where
node2 and
node3 are encapsulated in a Subgraph with execution interface ports.
Here’s when to choose different flow control patterns:
start_op() + Cyclic Flow
Best for: Dynamic routing, feedback loops, runtime-adaptive flows
Use when: Flow patterns depend on data content or need to change during execution
Advantages: Flexible, handles complex routing
Trade-offs: More complex to debug, slightly higher runtime overhead
Generator (root operator) with condition (CountCondition, PeriodicCondition, etc.)
Best for: Fixed iteration counts, simple linear flows
Use when: Number of iterations is known in advance (or infinite), static flow patterns
Advantages: Simple to implement, better performance, easier to debug
Trade-offs: Less flexible, cannot adapt to runtime conditions
Please see full examples under the
examples/flow_control folder in the Holoscan SDK repository for more detailed implementations and use cases.
Note that the execution control examples are also related to the dynamic behavior of operators, and are available in the execution_control directory.
Clear Flow Logic: Keep dynamic flow logic clear and well-documented. Use meaningful names for operators and document the conditions that trigger different flow paths.
Error Handling: Handle edge cases in flow callbacks. Consider what happens if expected operators are not available or if flow conditions are invalid.
State Management: Be careful with shared state in dynamic flows. Ensure thread safety when multiple operators access shared resources.
Performance: Consider the overhead of frequent flow changes. Cache flow information when possible and avoid unnecessary flow modifications.
Testing: Test all possible flow paths thoroughly. Create unit tests that verify both normal operation and edge cases for each flow pattern.
The dynamic flow control feature has the following limitations:
It can only be used to connect operators within the same fragment. For inter-fragment flows (connecting operators across different fragments), explicit non-execution ports must be used instead of dynamic flows.
Both the source and target operators must be native Holoscan operators (not GXF Operators).