What can I help you with?
NVIDIA Holoscan SDK v3.2.0

Program Listing for File operator_runner.hpp

Return to documentation for file (include/holoscan/utils/operator_runner.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP #define HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP #include <memory> #include <string> #include <unordered_map> #include <utility> #include "holoscan/core/conditions/gxf/asynchronous.hpp" #include "holoscan/core/domain/tensor_map.hpp" #include "holoscan/core/gxf/gxf_wrapper.hpp" #include "holoscan/core/operator.hpp" namespace holoscan::ops { class OperatorRunner { public: explicit OperatorRunner(const std::shared_ptr<holoscan::Operator>& op); const std::shared_ptr<holoscan::Operator>& op() const; holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name, nvidia::gxf::Entity& entity); holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name, nvidia::gxf::Entity&& entity); template <typename DataT, typename = std::enable_if_t<!holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>> holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name, DataT data) { // Create an Entity object and add a Message object to it. auto gxf_entity = nvidia::gxf::Entity::New(gxf_context_); auto buffer = gxf_entity.value().add<Message>(); // Set the data to the value of the Message object. buffer.value()->set_value(std::move(data)); auto entity = gxf_entity.value(); return push_input(port_name, entity); } holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name, const holoscan::TensorMap& data); void run(); holoscan::expected<holoscan::gxf::Entity, holoscan::RuntimeError> pop_output( const std::string& port_name); template <typename DataT> holoscan::expected<DataT, holoscan::RuntimeError> pop_output(const std::string& port_name) { // Find the transmitter for the given port auto it = double_buffer_transmitters_.find(port_name); if (it == double_buffer_transmitters_.end()) { return create_error( holoscan::ErrorCode::kNotFound, "The output port ('{}') with DoubleBufferTransmitter not found in the operator '{}'", port_name, op_->name()); } // Sync the output port auto result = it->second->sync(); if (!result) { return create_error( holoscan::ErrorCode::kFailure, "Failed to sync output port {} with DoubleBufferTransmitter in operator {}", port_name, op_->name(), result.get_error_message()); } // Pop the message from the transmitter auto maybe_message = it->second->pop(); if (!maybe_message) { return create_error(holoscan::ErrorCode::kReceiveError, "Failed to pop message from the output port ('{}') with " "DoubleBufferTransmitter in operator '{}'", port_name, op_->name()); } auto& entity = maybe_message.value(); // Handle std::any type if constexpr (std::is_same_v<DataT, std::any>) { auto message = entity.get<holoscan::Message>(); if (!message) { return holoscan::gxf::Entity(entity); // handle gxf::Entity as is } return message.value()->value(); } // Handle GXF Entity types if constexpr (is_one_of_derived_v<DataT, nvidia::gxf::Entity>) { return DataT(entity); } // Handle TensorMap type if constexpr (is_one_of_derived_v<DataT, holoscan::TensorMap>) { return handle_tensor_map_output(holoscan::gxf::Entity(entity), port_name); } // Handle all other types return handle_message_output<DataT>(entity, port_name); } protected: bool populate_tensor_map(const holoscan::gxf::Entity& gxf_entity, holoscan::TensorMap& tensor_map); std::shared_ptr<holoscan::Operator> op_; void* gxf_context_ = nullptr; holoscan::gxf::GXFWrapper* gxf_wrapper_ = nullptr; std::shared_ptr<holoscan::AsynchronousCondition> async_condition_; std::unordered_map<std::string, nvidia::gxf::DoubleBufferReceiver*> double_buffer_receivers_; std::unordered_map<std::string, nvidia::gxf::DoubleBufferTransmitter*> double_buffer_transmitters_; private: template <typename DataT> holoscan::expected<DataT, holoscan::RuntimeError> handle_message_output( const nvidia::gxf::Entity& entity, const std::string& port_name) { auto message = entity.get<holoscan::Message>(); if (!message) { return create_error( holoscan::ErrorCode::kReceiveError, "Unable to get the holoscan::Message from the received GXF Entity for output '{}'", port_name); } try { return std::any_cast<DataT>(message.value()->value()); } catch (const std::bad_any_cast& e) { return create_error( holoscan::ErrorCode::kReceiveError, "Unable to cast the received data to the specified type for output '{}' of type {}", port_name, message.value()->value().type().name()); } } holoscan::expected<holoscan::TensorMap, holoscan::RuntimeError> handle_tensor_map_output( const holoscan::gxf::Entity& entity, const std::string& port_name); template <typename... Args> holoscan::unexpected<holoscan::RuntimeError> create_error(holoscan::ErrorCode code, const char* format, Args&&... args) { auto message = fmt::format(format, std::forward<Args>(args)...); HOLOSCAN_LOG_DEBUG(message); return holoscan::unexpected<holoscan::RuntimeError>( holoscan::RuntimeError(code, message.c_str())); } }; } // namespace holoscan::ops #endif/* HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP */

© Copyright 2022-2025, NVIDIA. Last updated on May 29, 2025.