Before Holoscan SDK v3.0, operators needed input and output ports to be connected via add_flow() and there is no way to specify the execution dependency if the operator does not have any input or output ports.

However, in some cases, the requirements were different:

An ‘execution order dependency’ was needed instead of a ‘data flow dependency’.

Execution control was required rather than keeping a node running continuously.

The pipeline should run only once unless explicitly specified to loop.

To address these needs, Holoscan SDK v3.0 introduced implicit input/output ‘execution ports’ ( __input_exec__ / __output_exec__ ), inspired by Unreal Engine’s Blueprints (particularly execution pins).

The output execution port ( __output_exec__ . holoscan::Operator::kOutputExecPortName in C++ and holoscan.core.Operator.OUTPUT_EXEC_PORT_NAME in Python) of a source operator and the input execution port ( __input_exec__ , holoscan::Operator::kInputExecPortName in C++ and holoscan.core.Operator.INPUT_EXEC_PORT_NAME in Python) of a target operator are implicitly added when both of the following are true:

Two operators are connected using add_flow() without specifying a port map.

The target operator does not have an explicit input port.

Note Both the source and target operators must be native Holoscan operators. Attempting to connect a GXF Operator to a native Holoscan operator, or vice versa, with an empty port map will result in an error.

During execution, after the source operator’s compute() method is called, the Holoscan executor emits an empty message ( Entity ) to the implicit output execution port as a signal. This can then trigger the target operator’s execution, as the Holoscan executor attaches a MessageAvailableCondition to the target operator’s implicit input execution port. Before the target operator’s compute() method runs, the executor collects (pops) all messages from the implicit input execution port, enabling execution dependencies without requiring explicit input and output execution ports.

Starting with Holoscan SDK v3.0, operators can be connected via add_flow() without the need for explicit input and output execution ports, allowing for more flexible and dynamic operator connections.

Note These execution port concepts also apply to Subgraph interface ports. Subgraphs can expose execution interface ports using add_input_exec_interface_port() and add_output_exec_interface_port() , allowing Subgraphs to participate in dynamic flow control just like operators. See the Subgraph with Dynamic Flow Control section below for details.

In Holoscan, when the workflow graph is executed, root operators who do not have any input ports are first executed, and unless any condition is specified to the root operator (such as CountCondition or PeriodicCondition ), it will execute continuously.

Inspired by LangGraph’s start node (langgraph.graph.START), Holoscan SDK v3.0 introduces a new concept of the start operator.

The start operator is the first operator in an application fragment, serving as the entry point to the workflow. It is simply the first operator added to the fragment.

This operator is named <|start|> and has a condition of CountCondition(1) , ensuring it executes only once. Other entry operators that initiate fragment execution should connect to this operator.

In Holoscan, you can retrieve the start operator by calling start_op() ( C++ / Python ) within the compose() method. If this method is called multiple times, it will return the same start operator

This API is available in both C++ and Python (see flow_control/sequential for an example):

C++

Python Copy Copied! class SequentialExecutionApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the operators auto node1 = make_operator<SimpleOp>("node1"); auto node2 = make_operator<SimpleOp>("node2"); auto node3 = make_operator<SimpleOp>("node3"); // Define the three-operator workflow add_flow(start_op(), node1); add_flow(node1, node2); add_flow(node2, node3); } }; Copy Copied! class SequentialExecutionApp(Application): def compose(self): # Define the operators node1 = SimpleOp(self, name="node1") node2 = SimpleOp(self, name="node2") node3 = SimpleOp(self, name="node3") # Define the three-operator workflow self.add_flow(self.start_op(), node1) self.add_flow(node1, node2) self.add_flow(node2, node3)

In this example, the start operator is connected to node1 , making node1 the first operator to execute. Since the start operator has a condition of CountCondition(1) , it will only trigger once, ensuring node1 runs a single time.

In this example, each node (operator) is executed sequentially and executed only once. The start_op() method retrieves the start operator, which is connected to node1 . This makes node1 the first operator to execute. After node1 completes its execution, node2 is triggered, followed by node3 . The CountCondition(1) in the start_op() method ensures that each operator in the sequence runs a single time, maintaining a clear and predictable flow of execution.

The set_dynamic_flows() ( C++ / Python ) method allows for dynamic flow control in a Holoscan application. This method sets a callback function that determines the flow of execution based on the state of the operator at runtime.

In the example from flow_control/conditional, the set_dynamic_flows() method is used to dynamically control the flow between node1 , node2 , and node4 based on the value of node1 . The callback function checks the value of node1 and adds a dynamic flow to either node2 or node4 :

Copy Copied! Node Graph: node1 (launch twice) / \ node2 node4 | | node3 node5

C++

Python Copy Copied! class ConditionalExecutionApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the operators auto node1 = make_operator<SimpleOp>("node1", make_condition<CountCondition>(2)); auto node2 = make_operator<SimpleOp>("node2"); auto node3 = make_operator<SimpleOp>("node3"); auto node4 = make_operator<SimpleOp>("node4"); auto node5 = make_operator<SimpleOp>("node5"); add_flow(node1, node2); add_flow(node2, node3); add_flow(node1, node4); add_flow(node4, node5); set_dynamic_flows(node1, [node2, node4](const std::shared_ptr<Operator>& op) { auto simple_op = std::static_pointer_cast<SimpleOp>(op); if (simple_op->get_value() % 2 == 1) { simple_op->add_dynamic_flow(node2); } else { simple_op->add_dynamic_flow(node4); } }); } }; Copy Copied! class ConditionalExecutionApp(Application): def compose(self): # Define the operators node1 = SimpleOp(self, CountCondition(self, count=2), name="node1") node2 = SimpleOp(self, name="node2") node3 = SimpleOp(self, name="node3") node4 = SimpleOp(self, name="node4") node5 = SimpleOp(self, name="node5") self.add_flow(node1, node2) self.add_flow(node2, node3) self.add_flow(node1, node4) self.add_flow(node4, node5) def dynamic_flow_callback(op): if op.value % 2 == 1: op.add_dynamic_flow(node2) else: op.add_dynamic_flow(node4) self.set_dynamic_flows(node1, dynamic_flow_callback)

In the above example, the set_dynamic_flows() ( C++ / Python ) methods are used to define and manage dynamic workflows in the application.

The set_dynamic_flows() ( C++ / Python ) method takes an operator and a callback function as arguments. The callback function is called with the operator as an argument. Inside the callback function, the add_dynamic_flow() ( C++ / Python ) method is used to add a dynamic flow to the operator.

In the example, the callback function checks the value of node1 and adds a dynamic flow to either node2 or node4 .

The add_dynamic_flow() ( C++ / Python ) method has several overloads to support different ways of adding dynamic flows:

C++

Python Copy Copied! /// Basic connection using default output port. This is the simplest form for connecting /// two operators when you only need to specify the destination. void add_dynamic_flow(const std::shared_ptr<Operator>& next_op, const std::string& next_input_port = ""); /// Connection with explicit output port specification. Use this when the source operator /// has multiple output ports and you need to specify which one to use. void add_dynamic_flow(const std::string& curr_output_port, const std::shared_ptr<Operator>& next_op, const std::string& next_input_port = ""); /// Connection using a FlowInfo object, which encapsulates all connection details including: /// - Source operator and its output port specification /// - Destination operator and its input port specification /// - Port names and associated IOSpecs void add_dynamic_flow(const std::shared_ptr<FlowInfo>& flow); /// Batch connection using multiple FlowInfo objects. Use this to set up multiple /// connections in a single call, which is more efficient than making multiple /// individual connections. void add_dynamic_flow(const std::vector<std::shared_ptr<FlowInfo>>& flows); Copy Copied! # 1. Basic connection using default output port. This is the simplest form for connecting # two operators when you only need to specify the destination. op.add_dynamic_flow(next_op: Operator, next_input_port_name: str = '') # 2. Connection with explicit output port specification. Use this when the source operator # has multiple output ports and you need to specify which one to use. op.add_dynamic_flow(curr_output_port_name: str, next_op: Operator, next_input_port_name: str = '') # 3. Connection using a FlowInfo object, which encapsulates all connection details including: # - Source operator and its output port specification # - Destination operator and its input port specification # - Port names and associated IOSpecs # # This is useful for complex connections or when reusing connection patterns. op.add_dynamic_flow(flow: FlowInfo) # 4. Batch connection using multiple FlowInfo objects. Use this to set up multiple # connections in a single call, which is more efficient than making multiple # individual connections. op.add_dynamic_flow(flows: list[FlowInfo])

The simple form of add_dynamic_flow() ( C++ / Python ) is passing just the next operator (and optionally the next input port name).

If the next operator does not have any explicit input, you can omit the next input port name. In this case, current operator’s implicit output execution port will be connected to the next operator’s implicit input execution port.

The FlowInfo ( C++ / Python ) class represents information about a connection between operators and takes the following arguments in the constructor:

curr_operator : The source operator of the flow connection

curr_output_port : The name of the output port on the source operator

next_operator : The destination operator of the flow connection

next_input_port : The name of the input port on the destination operator

Inside the callback function, you can use the find_flow_info() ( C++ / Python ) method and find_all_flow_info() ( C++ / Python ) method to find the FlowInfo object(s) that matches the predicate.

The following example shows how to find the FlowInfo object(s) that matches the predicate:

C++

Python Copy Copied! class ConditionalExecutionApp : public holoscan::Application { public: void compose() override { using namespace holoscan; // Define the operators auto node1 = make_operator<SimpleOp>("node1", make_condition<CountCondition>(2)); auto node2 = make_operator<SimpleOp>("node2"); auto node3 = make_operator<SimpleOp>("node3"); auto node4 = make_operator<SimpleOp>("node4"); auto node5 = make_operator<SimpleOp>("node5"); add_flow(node1, node2); add_flow(node2, node3); add_flow(node1, node4); add_flow(node4, node5); // // If you want to add all the next flows, you can use the following code: // set_dynamic_flows( // node1, [](const std::shared_ptr<Operator>& op) { op->add_dynamic_flow(op->next_flows()); // }); set_dynamic_flows(node1, [](const std::shared_ptr<Operator>& op) { auto simple_op = std::static_pointer_cast<SimpleOp>(op); static const auto& node2_flow = op->find_flow_info( [](const auto& flow) { return flow->next_operator->name() == "node2"; }); static const auto& node4_flow = op->find_flow_info( [](const auto& flow) { return flow->next_operator->name() == "node4"; }); //static const auto& all_next_flows = op->find_all_flow_info( // [](const auto& flow) { return true; }); //std::cout << "All next flows: "; //for (const auto& flow : all_next_flows) { // std::cout << flow->next_operator->name() << " "; //} //std::cout << std::endl; if (simple_op->get_value() % 2 == 1) { simple_op->add_dynamic_flow(node2_flow); } else { simple_op->add_dynamic_flow(node4_flow); } }); } }; Copy Copied! class ConditionalExecutionApp(Application): def compose(self): # Define the operators node1 = SimpleOp(self, CountCondition(self, count=2), name="node1") node2 = SimpleOp(self, name="node2") node3 = SimpleOp(self, name="node3") node4 = SimpleOp(self, name="node4") node5 = SimpleOp(self, name="node5") self.add_flow(node1, node2) self.add_flow(node2, node3) self.add_flow(node1, node4) self.add_flow(node4, node5) # # If you want to add all the next flows, you can use the following code: # self.set_dynamic_flows(node1, lambda op: op.add_dynamic_flow(op.next_flows)) # This is another way to add dynamic flows based on the next operator name def dynamic_flow_callback(op): node2_flow = op.find_flow_info(lambda flow: flow.next_operator.name == "node2") node4_flow = op.find_flow_info(lambda flow: flow.next_operator.name == "node4") # all_next_flows = op.find_all_flow_info(lambda flow: True) # print(f"All next flows: {[flow.next_operator.name for flow in all_next_flows]}") if op.value % 2 == 1: op.add_dynamic_flow(node2_flow) else: op.add_dynamic_flow(node4_flow) self.set_dynamic_flows(node1, dynamic_flow_callback)

In the above example, instead of using op.add_dynamic_flow(node2) or op.add_dynamic_flow(node4) , we use op.add_dynamic_flow(node2_flow) or op.add_dynamic_flow(node4_flow) . And the node2_flow and node4_flow are FlowInfo objects that are found using the find_flow_info() method.

The find_flow_info() ( C++ / Python ) method takes a predicate as an argument and returns a FlowInfo object that matches the predicate.

The find_all_flow_info() ( C++ / Python ) method takes a predicate as an argument and returns a vector (list) of FlowInfo objects that match the predicate.

If you want to get a vector of all the next flows, you can use op->next_flows() in C++ or op.next_flows in Python.

Subgraphs can expose execution interface ports that work seamlessly with dynamic flow control features. This allows you to encapsulate sequences of operators and treat them as single units in your control flow.

Subgraphs support both data and execution interface ports:

Data interface ports : Expose operator data input/output ports using add_input_interface_port() and add_output_interface_port()

Execution interface ports: Expose operator execution ports using add_input_exec_interface_port() and add_output_exec_interface_port()

When connecting Subgraphs in control flow:

If a Subgraph has only one execution interface port (input or output), add_flow() will automatically resolve to that port

Subgraphs can be used with start_op() , just like regular operators

Dynamic flow routing works with Subgraphs through their interface ports

Nested Subgraphs are supported, allowing hierarchical control flow composition