holoscan::gxf::GXFExecutor

Beta
View as Markdown

Executor for GXF.

#include <holoscan/gxf/gxf_executor.hpp>

Inherits from: holoscan::Executor (public)


Constructors

GXFExecutor

holoscan::gxf::GXFExecutor::GXFExecutor(
holoscan::Fragment *app,
bool create_gxf_context = true
)

Destructor

~GXFExecutor

holoscan::gxf::GXFExecutor::~GXFExecutor() override

Methods

run

void holoscan::gxf::GXFExecutor::run(
OperatorFlowGraph &graph
) override

Initialize the graph and run the graph.

This method calls compose() to compose the graph, and runs the graph.

Parameters

graph
OperatorFlowGraph &

The reference to the graph.

run_async

std::future<void> holoscan::gxf::GXFExecutor::run_async(
OperatorFlowGraph &graph
) override

Initialize the graph and run the graph asynchronously.

This method calls compose() to compose the graph, and runs the graph asynchronously. The graph is executed in a separate thread and returns a future object.

Returns: The future object.

Parameters

graph
OperatorFlowGraph &

The reference to the graph.

interrupt

bool holoscan::gxf::GXFExecutor::interrupt() override

Interrupt the execution.

This method calls GxfGraphInterrupt() to interrupt the execution.

Returns: true if the interrupt was successful (graph was running), false if the graph was not running (already stopped or not started).

wait

void holoscan::gxf::GXFExecutor::wait() override

Wait for the execution to complete.

This method calls GxfGraphWait() to wait for the graph execution to complete. Should be called after interrupt() to ensure the scheduler has fully stopped.

Only call this if interrupt() returned true. Calling wait() when the graph is not running can cause issues with concurrent cleanup.

reset_execution_state

void holoscan::gxf::GXFExecutor::reset_execution_state()

Reset execution state to allow for multiple runs.

Resets internal flags related to graph initialization and activation to allow for multiple consecutive executions.

destroy_context

void holoscan::gxf::GXFExecutor::destroy_context()

Destroy the GXF context.

Releases resources associated with the GXF context if it exists. No operation is performed if the context is null or the context is not owned by this.

context

virtual
void holoscan::gxf::GXFExecutor::context(
void *context
) override

Set the context.

For GXF, GXFExtensionManager(extension_manager_) is initialized with the context.

Parameters

context
void *

The context.

extension_manager

std::shared_ptr<ExtensionManager> holoscan::gxf::GXFExecutor::extension_manager() override

Get GXF extension manager.

Returns: The GXF extension manager.

See also: GXFExtensionManager

op_eid

void holoscan::gxf::GXFExecutor::op_eid(
gxf_uid_t eid
)

Set the GXF entity ID of the operator initialized by this executor.

If this is 0, a new entity is created for the operator. Otherwise, the operator, as a codelet, will be added to the existing entity specified by this ID.

This is useful when initializing operators within an existing entity, e.g., when initializing an operator from the holoscan::gxf::OperatorWrapper class.

Parameters

eid
gxf_uid_t

The GXF entity ID.

op_cid

void holoscan::gxf::GXFExecutor::op_cid(
gxf_uid_t cid
)

Set the GXF component ID of the operator initialized by this executor.

If this is 0, a new component is created for the operator.

This is useful when initializing operators using an existing component within an existing entity, e.g., when initializing an operator from the holoscan::gxf::OperatorWrapper class.

Parameters

cid
gxf_uid_t

The GXF component ID.

entity_prefix

const std::string & holoscan::gxf::GXFExecutor::entity_prefix()

Get the entity prefix string.

Returns: The entity prefix string.

setup_signal_handlers

std::function<void(
void *,
int
)

Set up signal handlers for graceful shutdown.

fragment

void holoscan::gxf::GXFExecutor::fragment(
Fragment *fragment
)

Set the pointer to the fragment of the executor.

Parameters

fragment
Fragment *

The pointer to the fragment of the executor.

owns_context

bool holoscan::gxf::GXFExecutor::owns_context()

Get whether the context is owned by the executor.

Returns: true if the context is owned by the executor. Otherwise, false.

context_uint64

void holoscan::gxf::GXFExecutor::context_uint64(
uint64_t context
)

exception

void holoscan::gxf::GXFExecutor::exception(
const std::exception_ptr &e
)

Set the exception.

This method is called by the framework to store the exception that occurred during the execution of the fragment. If the exception is set, this exception is rethrown by the framework after the execution of the fragment.

Parameters

e
const std::exception_ptr &

The exception to store.

initialize_fragment

bool holoscan::gxf::GXFExecutor::initialize_fragment() override

Initialize the fragment_ in this Executor.

This method is called by run() to initialize the fragment and the graph of operators in the fragment before execution.

Returns: true if fragment initialization is successful. Otherwise, false.

initialize_operator

bool holoscan::gxf::GXFExecutor::initialize_operator(
Operator *op
) override

Initialize the given operator.

This method is called by Operator::initialize() to initialize the operator.

Depending on the type of the operator, this method may be overridden to initialize the operator. For example, the default executor (GXFExecutor) initializes the operator using the GXF API and sets the operator’s ID to the ID of the GXF codelet.

Returns: true if the operator is initialized successfully. Otherwise, false.

Parameters

op
Operator *

The pointer to the operator.

initialize_scheduler

bool holoscan::gxf::GXFExecutor::initialize_scheduler(
Scheduler *sch
) override

Initialize the given scheduler.

This method is called by Scheduler::initialize() to initialize the operator.

Depending on the type of the scheduler, this method may be overridden to initialize the scheduler. For example, the default executor (GXFExecutor) initializes the scheduler using the GXF API and sets the operator’s ID to the ID of the GXF scheduler.

Returns: true if the scheduler is initialized successfully. Otherwise, false.

Parameters

sch
Scheduler *

The pointer to the scheduler.

initialize_network_context

bool holoscan::gxf::GXFExecutor::initialize_network_context(
NetworkContext *network_context
) override

Initialize the given network context.

This method is called by NetworkContext::initialize() to initialize the operator.

Depending on the type of the network context, this method may be overridden to initialize the network context. For example, the default executor (GXFExecutor) initializes the network context using the GXF API and sets the operator’s ID to the ID of the GXF network context.

Returns: true if the network context is initialized successfully. Otherwise, false.

Parameters

network_context
NetworkContext *

The pointer to the network context.

initialize_fragment_services

bool holoscan::gxf::GXFExecutor::initialize_fragment_services() override

Initialize the fragment services for the executor.

This method is called during executor initialization to set up any required fragment services.

Depending on the type of executor, this method may be overridden to initialize specific fragment services. For example, the default executor (GXFExecutor) may initialize fragment services using the GXF API.

Returns: true if the fragment services are initialized successfully. Otherwise, false.

add_receivers

bool holoscan::gxf::GXFExecutor::add_receivers(
const std::shared_ptr<Operator> &op,
const std::string &receivers_name,
std::vector<std::string> &new_input_labels,
std::vector<holoscan::IOSpec *> &iospec_vector
) override

Add the receivers as input ports of the given operator.

This method is to be called by the Fragment::add_flow() method to support for the case where the destination input port label points to the parameter name of the downstream operator, and the parameter type is ‘std::vector<holoscan::IOSpec*>’. This finds a parameter with with ‘std::vector<holoscan::IOSpec*>’ type and create a new input port with a specific label (‘parameter name:index’. e.g, ‘receivers:0’).

Returns: true if the receivers are added successfully. Otherwise, false.

Parameters

op
const std::shared_ptr<Operator> &

The reference to the shared pointer of the operator.

receivers_name
const std::string &

The name of the receivers whose parameter type is ‘std::vector<holoscan::IOSpec*>’.

new_input_labels
std::vector<std::string> &

The reference to the vector of input port labels to which the input port labels are added. In the case of multiple receivers, the input port label is updated to ‘parameter name:index’ (e.g. ‘receivers’ => ‘receivers:0’).

iospec_vector
std::vector<holoscan::IOSpec *> &

The reference to the vector of IOSpec pointers.

add_control_flow

bool holoscan::gxf::GXFExecutor::add_control_flow(
const std::shared_ptr<Operator> &upstream_op,
const std::shared_ptr<Operator> &downstream_op
) override

Add a control flow between two operators.

This method is called by Fragment::add_flow() to add a control flow between two operators.

Returns: true if the control flow is added successfully. Otherwise, false.

Parameters

upstream_op
const std::shared_ptr<Operator> &

The shared pointer to the upstream operator.

downstream_op
const std::shared_ptr<Operator> &

The shared pointer to the downstream operator.

initialize_gxf_graph

bool holoscan::gxf::GXFExecutor::initialize_gxf_graph(
OperatorFlowGraph &graph
)

activate_gxf_graph

void holoscan::gxf::GXFExecutor::activate_gxf_graph()

run_gxf_graph

void holoscan::gxf::GXFExecutor::run_gxf_graph()

connection_items

bool holoscan::gxf::GXFExecutor::connection_items(
std::vector<std::shared_ptr<holoscan::ConnectionItem>> &connection_items
)

add_operator_to_entity_group

void holoscan::gxf::GXFExecutor::add_operator_to_entity_group(
gxf_context_t context,
gxf_uid_t entity_group_gid,
std::shared_ptr<Operator> op
)

register_extensions

void holoscan::gxf::GXFExecutor::register_extensions()

initialize_gxf_resources

void holoscan::gxf::GXFExecutor::initialize_gxf_resources(
std::unordered_map<std::string, std::shared_ptr<Resource>> &resources,
gxf_uid_t eid,
const std::shared_ptr<nvidia::gxf::GraphEntity> &graph_entity
)

Initialize all GXF Resources in the map and assign them to graph_entity.

Utility function grouping common code across initialize_network_context and intialize_scheduler.

Parameters

resources
std::unordered_map<std::string, std::shared_ptr<Resource>> &

Unordered map of GXF resources.

eid
gxf_uid_t

The entity to which the resources will be assigned.

graph_entity
const std::shared_ptr<nvidia::gxf::GraphEntity> &

Nvidia::gxf::GraphEntity pointer for the resources.

add_connection

gxf_result_t holoscan::gxf::GXFExecutor::add_connection(
gxf_uid_t source_cid,
gxf_uid_t target_cid
)

Create a GXF Connection component between a transmitter and receiver.

The Connection object created will belong to connections_entity_.

Returns: The GXF status code.

Parameters

source_cid
gxf_uid_t

The GXF Transmitter component ID.

target_cid
gxf_uid_t

The GXF Receiver component ID.

create_broadcast_components

void holoscan::gxf::GXFExecutor::create_broadcast_components(
const holoscan::OperatorFlowGraph::NodeType &op,
BroadcastEntityMapType &broadcast_entities,
const TargetConnectionsMapType &connections
)

Create Broadcast components and add their nvidia::gxf::GraphEntity to broadcast_entites.

This is a helper method that gets called by initialize_fragment.

Creates broadcast components for any output ports of op that connect to more than one input port.

Does not add any transmitter to the Broadcast entity. The transmitters will be added later when the incoming edges to the respective operators are processed.

Any connected ports of the operator are removed from port_map_val.

Parameters

op
const holoscan::OperatorFlowGraph::NodeType &

The operator to create broadcast components for.

broadcast_entities
BroadcastEntityMapType &

The mapping of broadcast graph entities.

connections
const TargetConnectionsMapType &

TODO

connect_broadcast_to_previous_op

void holoscan::gxf::GXFExecutor::connect_broadcast_to_previous_op(
const BroadcastEntityMapType &broadcast_entities,
const holoscan::OperatorFlowGraph::NodeType &op,
const holoscan::OperatorFlowGraph::NodeType &prev_op,
const holoscan::OperatorFlowGraph::EdgeDataType &port_map_val
)

Add connection between the prior Broadcast component and the current operator’s input port(s).

Creates a transmitter on the broadcast component and connects it to the input port of op.

Any connected ports of the operator are removed from port_map_val.

Parameters

broadcast_entities
const BroadcastEntityMapType &

The mapping of broadcast graph entities.

op
const holoscan::OperatorFlowGraph::NodeType &

The broadcast entity’s output will connect to the input port of this operator.

prev_op
const holoscan::OperatorFlowGraph::NodeType &

The operator connected to the input of the broadcast entity. The capacity and policy of the transmitter added to the broadcast entity will be copied from the transmitter on the broadcasted output port of this operator.

port_map_val
const holoscan::OperatorFlowGraph::EdgeDataType &

The port mapping between prev_op and op.

add_condition_to_graph_entity

bool holoscan::gxf::GXFExecutor::add_condition_to_graph_entity(
const std::shared_ptr<Condition> &condition,
std::shared_ptr<nvidia::gxf::GraphEntity> graph_entity
)

Helper function that adds a GXF Condition to the specified graph entity.

add_resource_to_graph_entity

bool holoscan::gxf::GXFExecutor::add_resource_to_graph_entity(
const std::shared_ptr<Resource> &resource,
std::shared_ptr<nvidia::gxf::GraphEntity> graph_entity
)

Helper function that adds a GXF Resource to the specified graph entity.

add_iospec_to_graph_entity

bool holoscan::gxf::GXFExecutor::add_iospec_to_graph_entity(
IOSpec *io_spec,
const std::shared_ptr<nvidia::gxf::GraphEntity> &graph_entity
)

add_component_args_to_graph_entity

void holoscan::gxf::GXFExecutor::add_component_args_to_graph_entity(
std::vector<Arg> &args,
const std::shared_ptr<nvidia::gxf::GraphEntity> &graph_entity
)

connect_ucx_transmitters_to_virtual_ops

void holoscan::gxf::GXFExecutor::connect_ucx_transmitters_to_virtual_ops(
Fragment *fragment,
std::vector<std::shared_ptr<ops::VirtualOperator>> &virtual_ops
)

add_gpu_device_to_graph_entity

std::shared_ptr<GPUDevice> holoscan::gxf::GXFExecutor::add_gpu_device_to_graph_entity(
const std::string &device_name,
const std::shared_ptr<nvidia::gxf::GraphEntity> &graph_entity,
std::optional<int32_t> device_id = std::nullopt
)

get_operator_port_iospec

static std::shared_ptr<IOSpec> holoscan::gxf::GXFExecutor::get_operator_port_iospec(
const std::shared_ptr<Operator> &op,
const std::string &port_name,
IOSpec::IOType io_type
)

get_operator_port_cid

static gxf_uid_t holoscan::gxf::GXFExecutor::get_operator_port_cid(
const std::shared_ptr<Operator> &op,
const std::string &port_name,
IOSpec::IOType io_type
)

Static methods

create_input_port

static void holoscan::gxf::GXFExecutor::create_input_port(
Fragment *fragment,
IOSpec *io_spec,
Operator *op = nullptr
)

Create and setup GXF components for input port.

For a given input port specification, create a GXF Receiver component for the port and create a GXF SchedulingTerm component that is corresponding to the Condition of the port.

If there is no condition specified for the port, a default condition (MessageAvailableCondition) is created. It currently supports ConditionType::kMessageAvailable and ConditionType::kNone condition types.

This function is a static function so that it can be called from other classes without dependency on this class.

Parameters

fragment
Fragment *

The fragment that this operator belongs to.

io_spec
IOSpec *

The input port specification.

op
Operator *Defaults to nullptr

The operator to which this port is being added. create a new GXF Receiver component.

create_output_port

static void holoscan::gxf::GXFExecutor::create_output_port(
Fragment *fragment,
IOSpec *io_spec,
Operator *op = nullptr
)

Create and setup GXF components for output port.

For a given output port specification, create a GXF Receiver component for the port and create a GXF SchedulingTerm component that is corresponding to the Condition of the port.

If there is no condition specified for the port, a default condition (DownstreamMessageAffordableCondition) is created. It currently supports ConditionType::kDownstreamMessageAffordable and ConditionType::kNone condition types.

This function is a static function so that it can be called from other classes without dependency on on this class.

Parameters

fragment
Fragment *

The fragment that this operator belongs to.

io_spec
IOSpec *

The output port specification.

op
Operator *Defaults to nullptr

The operator to which this port is being added. create a new GXF Transmitter component.

reset_interrupt_flags

static void holoscan::gxf::GXFExecutor::reset_interrupt_flags()

Reset the static interrupt flags and cancel any active force-exit countdown threads.

This method is needed for unit testing of interrupt handling to ensure clean state between test cases. It cancels any lingering countdown threads from previous tests and resets all static interrupt-related flags.

register_codec

template <typename typeT>
static void holoscan::gxf::GXFExecutor::register_codec(
const std::string &codec_name,
bool overwrite = true
)

Register the codec for serialization/deserialization of a custom type.

If any operator has an argument with a custom type, the codec must be registered using this method.

For example, suppose we want to emit using the following custom struct type:

Then, we can define codec<Coordinate> as follows where the serialize and deserialize methods would be used for serialization and deserialization of this type, respectively.

In this case, since this is a simple struct with a static size, we can use the existing serialize_trivial_type and deserialize_trivial_type implementations.

Finally, to register this custom codec at runtime, we need to make the following call within the setup method of our Operator.

Template parameters

typeT
typename

The type of the argument to register.

Parameters

codec_name
const std::string &

The name of the codec (must be unique unless overwrite is true).

overwrite
boolDefaults to true

If true and codec_name already exists, the codec will be overwritten.

Example

namespace holoscan {
struct Coordinate {
int16_t x;
int16_t y;
int16_t z;
}
} // namespace holoscan

Example

namespace holoscan {
template <>
struct codec<Coordinate> {
static expected<size_t, RuntimeError> serialize(const Coordinate& value, Endpoint*
endpoint) { return serialize_trivial_type<Coordinate>(value, endpoint);
}
static expected<Coordinate, RuntimeError> deserialize(Endpoint* endpoint) {
return deserialize_trivial_type<Coordinate>(endpoint);
}
};
} // namespace holoscan

Example

GXFExecutor::register_codec<Coordinate>("Coordinate");

Types

Typedefs

NameDefinition
TargetPortstd::pair< holoscan::OperatorFlowGraph::NodeType, std::string >
TargetsInfostd::tuple< std::string, IOSpec::ConnectorType, std::set< TargetPort > >
TargetConnectionsMapTypestd::unordered_map< gxf_uid_t, TargetsInfo >
BroadcastEntityMapTypestd::unordered_map< holoscan::OperatorFlowGraph::NodeType, std::unordered_map< std::string, std::shared_ptr< nvidia::gxf::GraphEntity > > >

Member variables

NameTypeDescription
op_eid_gxf_uid_tThe GXF entity ID of the operator.
op_cid_gxf_uid_tThe GXF component ID of the operator.
gxf_holoscan_extension_std::shared_ptr< nvidia::gxf::Extension >The GXF holoscan extension.
is_gxf_graph_initialized_boolThe flag to indicate whether the GXF graph is initialized.
is_gxf_graph_activated_boolThe flag to indicate whether the GXF graph is activated.
is_run_called_boolThe flag to indicate whether run() or run_async() has been invoked.
is_fragment_services_initialized_boolThe flag to indicate whether the GXF fragment services are initialized.
entity_prefix_std::stringThe entity prefix for the fragment.
connection_items_std::vector< std::shared_ptr< holoscan::ConnectionItem > >The connection items for virtual operators.
implicit_broadcast_entities_std::list< std::shared_ptr< nvidia::gxf::GraphEntity > >The list of implicit broadcast entities to be added to the network entity group.
util_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
gpu_device_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
scheduler_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
network_context_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
connections_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
fragment_services_entity_std::shared_ptr< nvidia::gxf::GraphEntity >
fragment_Fragment *The fragment of the executor.
context_void *The context.
owns_context_boolWhether the context is owned by the executor.
extension_manager_std::shared_ptr< ExtensionManager >The extension manager.
exception_std::exception_ptrThe stored exception.
interrupt_requested_ staticstd::atomic< bool >
force_exit_countdown_started_ staticstd::atomic< bool >
first_interrupt_time_ms_ staticstd::atomic< int64_t >
active_countdown_flag_ staticstd::shared_ptr< std::atomic< bool > >
countdown_flag_mutex_ staticstd::mutex
wait_in_progress_std::atomic< bool >