Program Listing for File operator_runner.hpp
↰ Return to documentation for file (include/holoscan/utils/operator_runner.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP
#define HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include "holoscan/core/conditions/gxf/asynchronous.hpp"
#include "holoscan/core/domain/tensor_map.hpp"
#include "holoscan/core/gxf/gxf_wrapper.hpp"
#include "holoscan/core/operator.hpp"
namespace holoscan::ops {
class OperatorRunner {
public:
explicit OperatorRunner(const std::shared_ptr<holoscan::Operator>& op);
const std::shared_ptr<holoscan::Operator>& op() const;
holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name,
nvidia::gxf::Entity& entity);
holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name,
nvidia::gxf::Entity&& entity);
template <typename DataT,
typename = std::enable_if_t<!holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>>
holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name,
DataT data) {
// Create an Entity object and add a Message object to it.
auto gxf_entity = nvidia::gxf::Entity::New(gxf_context_);
auto buffer = gxf_entity.value().add<Message>();
// Set the data to the value of the Message object.
buffer.value()->set_value(std::move(data));
auto entity = gxf_entity.value();
return push_input(port_name, entity);
}
holoscan::expected<void, holoscan::RuntimeError> push_input(const std::string& port_name,
const holoscan::TensorMap& data);
void run();
holoscan::expected<holoscan::gxf::Entity, holoscan::RuntimeError> pop_output(
const std::string& port_name);
template <typename DataT>
holoscan::expected<DataT, holoscan::RuntimeError> pop_output(const std::string& port_name) {
// Find the transmitter for the given port
auto it = double_buffer_transmitters_.find(port_name);
if (it == double_buffer_transmitters_.end()) {
return create_error(
holoscan::ErrorCode::kNotFound,
"The output port ('{}') with DoubleBufferTransmitter not found in the operator '{}'",
port_name,
op_->name());
}
// Sync the output port
auto result = it->second->sync();
if (!result) {
return create_error(
holoscan::ErrorCode::kFailure,
"Failed to sync output port {} with DoubleBufferTransmitter in operator {}",
port_name,
op_->name(),
result.get_error_message());
}
// Pop the message from the transmitter
auto maybe_message = it->second->pop();
if (!maybe_message) {
return create_error(holoscan::ErrorCode::kReceiveError,
"Failed to pop message from the output port ('{}') with "
"DoubleBufferTransmitter in operator '{}'",
port_name,
op_->name());
}
auto& entity = maybe_message.value();
// Handle std::any type
if constexpr (std::is_same_v<DataT, std::any>) {
auto message = entity.get<holoscan::Message>();
if (!message) {
return holoscan::gxf::Entity(entity); // handle gxf::Entity as is
}
return message.value()->value();
}
// Handle GXF Entity types
if constexpr (is_one_of_derived_v<DataT, nvidia::gxf::Entity>) { return DataT(entity); }
// Handle TensorMap type
if constexpr (is_one_of_derived_v<DataT, holoscan::TensorMap>) {
return handle_tensor_map_output(holoscan::gxf::Entity(entity), port_name);
}
// Handle all other types
return handle_message_output<DataT>(entity, port_name);
}
protected:
bool populate_tensor_map(const holoscan::gxf::Entity& gxf_entity,
holoscan::TensorMap& tensor_map);
std::shared_ptr<holoscan::Operator> op_;
void* gxf_context_ = nullptr;
holoscan::gxf::GXFWrapper* gxf_wrapper_ =
nullptr;
std::shared_ptr<holoscan::AsynchronousCondition>
async_condition_;
std::unordered_map<std::string, nvidia::gxf::DoubleBufferReceiver*> double_buffer_receivers_;
std::unordered_map<std::string, nvidia::gxf::DoubleBufferTransmitter*>
double_buffer_transmitters_;
private:
template <typename DataT>
holoscan::expected<DataT, holoscan::RuntimeError> handle_message_output(
const nvidia::gxf::Entity& entity, const std::string& port_name) {
auto message = entity.get<holoscan::Message>();
if (!message) {
return create_error(
holoscan::ErrorCode::kReceiveError,
"Unable to get the holoscan::Message from the received GXF Entity for output '{}'",
port_name);
}
try {
return std::any_cast<DataT>(message.value()->value());
} catch (const std::bad_any_cast& e) {
return create_error(
holoscan::ErrorCode::kReceiveError,
"Unable to cast the received data to the specified type for output '{}' of type {}",
port_name,
message.value()->value().type().name());
}
}
holoscan::expected<holoscan::TensorMap, holoscan::RuntimeError> handle_tensor_map_output(
const holoscan::gxf::Entity& entity, const std::string& port_name);
template <typename... Args>
holoscan::unexpected<holoscan::RuntimeError> create_error(holoscan::ErrorCode code,
const char* format, Args&&... args) {
auto message = fmt::format(format, std::forward<Args>(args)...);
HOLOSCAN_LOG_DEBUG(message);
return holoscan::unexpected<holoscan::RuntimeError>(
holoscan::RuntimeError(code, message.c_str()));
}
};
} // namespace holoscan::ops
#endif/* HOLOSCAN_UTILS_OPERATOR_RUNNER_HPP */