Program Listing for File io_context.hpp
↰ Return to documentation for file (include/holoscan/core/io_context.hpp)
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HOLOSCAN_CORE_IO_CONTEXT_HPP
#define HOLOSCAN_CORE_IO_CONTEXT_HPP
#include <cuda_runtime.h>
#include <any>
#include <map>
#include <memory>
#include <string>
#include <string_view>
#include <typeinfo>
#include <unordered_map>
#include <utility>
#include <vector>
#include <common/type_name.hpp>
#include "./common.hpp"
#include "./cuda_object_handler.hpp"
#include "./data_logger.hpp"
#include "./domain/tensor_map.hpp"
#include "./errors.hpp"
#include "./expected.hpp"
#include "./fragment.hpp"
#include "./gxf/entity.hpp"
#include "./message.hpp"
#include "./operator.hpp"
#include "./type_traits.hpp"
#include "holoscan/profiler/profiler.hpp"
// IO Context specific profiling events
PROF_DEFINE_EVENT(event_receive, "receive", 0x33, 0xDD, 0xCC);
PROF_DEFINE_EVENT(event_receive_impl, "receive_impl", 0x55, 0xBB, 0xAA);
PROF_DEFINE_EVENT(event_receive_metadata, "receive_metadata", 0xFF, 0x99, 0x00);
PROF_DEFINE_EVENT(event_receive_streams, "receive_streams", 0x99, 0xFF, 0x00);
// Data logging profiling events (all use purple color for consistency)
PROF_DEFINE_EVENT(event_data_logging, "data_logging", 0x99, 0x33, 0xFF);
PROF_DEFINE_EVENT(event_log_data, "log_data", 0x99, 0x33, 0xFF);
PROF_DEFINE_EVENT(event_log_tensor, "log_tensor", 0x99, 0x33, 0xFF);
PROF_DEFINE_EVENT(event_log_tensormap, "log_tensormap", 0x99, 0x33, 0xFF);
// OutputContext emit profiling events (all use cyan/blue color variants)
PROF_DEFINE_EVENT(event_emit, "emit", 0x00, 0x99, 0xFF);
PROF_DEFINE_EVENT(event_emit_impl, "emit_impl", 0x33, 0x77, 0xDD);
PROF_DEFINE_EVENT(event_emit_metadata, "emit_metadata", 0xFF, 0x99, 0x00);
PROF_DEFINE_EVENT(event_emit_streams, "emit_streams", 0x99, 0xFF, 0x00);
// CUDA stream profiling events (all use same green color for consistency)
PROF_DEFINE_EVENT(event_receive_cuda_stream, "receive_cuda_stream", 0x99, 0xFF, 0x00);
PROF_DEFINE_EVENT(event_receive_cuda_streams, "receive_cuda_streams", 0x99, 0xFF, 0x00);
PROF_DEFINE_EVENT(event_set_cuda_stream, "set_cuda_stream", 0x99, 0xFF, 0x00);
namespace holoscan {
// To indicate that data is not available for the input port
struct NoMessageType {};
constexpr NoMessageType kNoReceivedMessage;
// To indicate that input port is not accessible
struct NoAccessibleMessageType : public std::string {
NoAccessibleMessageType() : std::string("Port is not accessible") {}
explicit NoAccessibleMessageType(const std::string& message) : std::string(message) {}
explicit NoAccessibleMessageType(const char* message) : std::string(message) {}
explicit NoAccessibleMessageType(std::string&& message) : std::string(std::move(message)) {}
};
static inline std::string get_well_formed_name(
const char* name, const std::unordered_map<std::string, std::shared_ptr<IOSpec>>& io_list) {
// If name is provided, use it directly
if (name != nullptr && name[0] != '\0') {
return name;
}
// Handle empty name with execution port case (relatively common) -> use the only port
if (io_list.size() == 1) {
return io_list.begin()->first;
}
return "";
}
class InputContext {
public:
InputContext(ExecutionContext* execution_context, Operator* op,
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs)
: execution_context_(execution_context), op_(op), inputs_(inputs) {}
InputContext(ExecutionContext* execution_context, Operator* op)
: execution_context_(execution_context), op_(op), inputs_(op->spec()->inputs()) {}
virtual ~InputContext() = default;
ExecutionContext* execution_context() const { return execution_context_; }
Operator* op() const { return op_; }
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs() const { return inputs_; }
bool empty(const char* name = nullptr) {
// First see if the name could be found in the inputs
auto& inputs = op_->spec()->inputs();
auto it = inputs.find(std::string(name));
if (it != inputs.end()) {
return empty_impl(name);
}
// Then see if it is in the parameters
auto& params = op_->spec()->params();
auto it2 = params.find(std::string(name));
if (it2 != params.end()) {
auto& param_wrapper = it2->second;
auto& arg_type = param_wrapper.arg_type();
if ((arg_type.element_type() != ArgElementType::kIOSpec) ||
(arg_type.container_type() != ArgContainerType::kVector)) {
HOLOSCAN_LOG_ERROR("Input parameter with name '{}' is not of type 'std::vector<IOSpec*>'",
name);
return true;
}
std::any& any_param = param_wrapper.value();
// Note that the type of any_param is Parameter<typeT>*, not Parameter<typeT>.
auto& param = *std::any_cast<Parameter<std::vector<IOSpec*>>*>(any_param);
int num_inputs = param.get().size();
for (int i = 0; i < num_inputs; ++i) {
// if any of them is not empty return false
if (!empty_impl(fmt::format("{}:{}", name, i).c_str())) {
return false;
}
}
return true; // all of them are empty, so return true.
}
HOLOSCAN_LOG_ERROR("Input port '{}' not found", name);
return true;
}
protected:
enum class InputType {
kGXFEntity,
kAny,
};
public:
template <typename DataT>
holoscan::expected<DataT, holoscan::RuntimeError> receive(const char* name = nullptr) {
std::string input_name = holoscan::get_well_formed_name(name, inputs_);
auto input_it = inputs_.find(input_name);
std::string unique_id;
if (input_it != inputs_.end()) {
unique_id = input_it->second->unique_id();
} else {
unique_id = fmt::format("{}.{}", op_->name(), name == nullptr ? "<unknown>" : name);
}
PROF_SCOPED_PORT_EVENT(op_->id(), unique_id, event_receive::color);
auto& data_loggers = op_->fragment()->data_loggers();
HOLOSCAN_LOG_TRACE("InputContext::receive for op: {}, name: {}",
op_->name(),
name == nullptr ? "nullptr" : name);
// Special case handling for std::shared_ptr<holoscan::Tensor>
if constexpr (std::is_same_v<DataT, std::shared_ptr<holoscan::Tensor>>) {
HOLOSCAN_LOG_TRACE("\tstd::shared_ptr<Tensor> code path");
// Omit logging TensorMap here as we want to log use log_tensor_data instead for this single
// tensor case.
bool omit_tensormap_logging = true;
auto maybe_tensormap = receive_single_value<holoscan::TensorMap>(
input_name.c_str(), InputType::kAny, omit_tensormap_logging);
if (maybe_tensormap.has_value()) {
HOLOSCAN_LOG_TRACE("\t\tTensorMap code path");
auto& tensor_map = maybe_tensormap.value();
if (tensor_map.size() == 1) {
// Return the shared_ptr directly from the map
auto tensor_ptr = tensor_map.begin()->second;
if (!data_loggers.empty()) {
HOLOSCAN_LOG_TRACE("[receive] logging single Tensor from TensorMap");
log_tensor(tensor_ptr, input_name.c_str(), IOSpec::IOType::kInput);
}
return tensor_ptr;
}
return make_unexpected<holoscan::RuntimeError>(create_receive_error(
name,
"Data received was a TensorMap containing more than one tensor. Please use "
"receive<holoscan::TensorMap>(name) instead."));
} else {
HOLOSCAN_LOG_TRACE("\t\tsingle Tensor code path");
std::string input_name = holoscan::get_well_formed_name(name, inputs_);
auto maybe_tensor = receive_single_value<std::shared_ptr<holoscan::Tensor>>(
input_name.c_str(), InputType::kAny);
if (maybe_tensor.has_value() && !data_loggers.empty()) {
HOLOSCAN_LOG_TRACE("[receive] logging single Tensor");
log_tensor(maybe_tensor.value(), input_name.c_str(), IOSpec::IOType::kInput);
}
return maybe_tensor;
}
}
// Original implementation for other types
auto& params = op_->spec()->params();
auto param_it = params.find(input_name);
InputType in_type = InputType::kAny;
// Check if the type is a GXF entity or a vector of GXF entities
if constexpr (is_one_of_derived_v<typename holoscan::type_info<DataT>::element_type,
nvidia::gxf::Entity>) {
in_type = InputType::kGXFEntity;
}
HOLOSCAN_LOG_TRACE("\tin_type: {}", magic_enum::enum_name(in_type));
if constexpr (holoscan::is_vector_v<DataT>) {
DataT input_vector;
std::string error_message;
HOLOSCAN_LOG_TRACE("\tin vector<DataT> code path");
if (param_it != params.end()) {
auto& param_wrapper = param_it->second;
if (!is_valid_param_type(param_wrapper.arg_type())) {
return make_unexpected<holoscan::RuntimeError>(create_receive_error(
input_name.c_str(), "Input parameter is not of type 'std::vector<IOSpec*>'"));
}
if (!fill_input_vector_from_params(
param_wrapper, input_name.c_str(), input_vector, in_type, error_message)) {
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(input_name.c_str(), error_message.c_str()));
}
} else {
if (!fill_input_vector_from_inputs(
input_name.c_str(), input_vector, in_type, error_message)) {
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(input_name.c_str(), error_message.c_str()));
}
}
return input_vector;
} else {
HOLOSCAN_LOG_TRACE("\tin non-vector DataT code path");
return receive_single_value<DataT>(input_name.c_str(), in_type);
}
}
std::shared_ptr<CudaObjectHandler> cuda_object_handler() { return cuda_object_handler_; }
void cuda_object_handler(std::shared_ptr<CudaObjectHandler> handler) {
cuda_object_handler_ = std::move(handler);
}
virtual cudaStream_t receive_cuda_stream(const char* input_port_name = nullptr,
bool allocate = true, bool sync_to_default = false) = 0;
virtual std::vector<std::optional<cudaStream_t>> receive_cuda_streams(
const char* input_port_name = nullptr) = 0;
protected:
virtual bool empty_impl([[maybe_unused]] const char* name = nullptr) { return true; }
virtual std::any receive_impl([[maybe_unused]] const char* name = nullptr,
[[maybe_unused]] InputType in_type = InputType::kAny,
[[maybe_unused]] bool no_error_message = false,
[[maybe_unused]] bool omit_data_logging = false) {
return nullptr;
}
// --------------- Start of helper functions for the receive method ---------------
inline bool is_valid_param_type(const ArgType& arg_type) {
return (arg_type.element_type() == ArgElementType::kIOSpec) &&
(arg_type.container_type() == ArgContainerType::kVector);
}
template <typename DataT>
inline bool fill_input_vector_from_params(ParameterWrapper& param_wrapper, const char* name,
DataT& input_vector, InputType in_type,
std::string& error_message) {
auto& param = *std::any_cast<Parameter<std::vector<IOSpec*>>*>(param_wrapper.value());
int num_inputs = param.get().size();
input_vector.reserve(num_inputs);
for (int index = 0; index < num_inputs; ++index) {
std::string port_name = fmt::format("{}:{}", name, index);
auto value = receive_impl(port_name.c_str(), in_type, true);
const std::type_info& value_type = value.type();
if (value_type == typeid(kNoReceivedMessage)) {
error_message =
fmt::format("No data is received from the input port with name '{}'", port_name);
return false;
}
if (!process_received_value(
value, value_type, port_name.c_str(), input_vector, error_message)) {
return false;
}
}
return true;
}
template <typename DataT>
inline bool fill_input_vector_from_inputs(const char* name, DataT& input_vector,
InputType in_type, std::string& error_message) {
const auto& inputs = op_->spec()->inputs();
const auto input_it = inputs.find(std::string(name));
if (input_it == inputs.end()) {
return false;
}
int index = 0;
while (true) {
auto value = receive_impl(name, in_type);
const std::type_info& value_type = value.type();
if (value_type == typeid(kNoReceivedMessage)) {
if (index == 0) {
error_message =
fmt::format("No data is received from the input port with name '{}'", name);
return false;
}
break;
}
if (index == 0 && value_type == typeid(DataT)) {
// If the first input is of type DataT (such as `std::vector<bool>`), then return the value
// directly
input_vector = std::move(std::any_cast<DataT>(value));
return true;
}
if (!process_received_value(value, value_type, name, input_vector, error_message)) {
return false;
}
index++;
}
return true;
}
// Get unique_id for an input or output port
std::string get_unique_id(Operator* op, const std::string& port_name, IOSpec::IOType io_type) {
if (!op->spec()) {
throw std::runtime_error("Operator spec is not available");
}
auto& ports = io_type == IOSpec::IOType::kInput ? op->spec()->inputs() : op->spec()->outputs();
auto it = ports.find(port_name);
if (it != ports.end()) {
return it->second->unique_id();
}
HOLOSCAN_LOG_WARN("Input port '{}' not found", port_name);
return fmt::format("{}.{}", op->qualified_name(), port_name);
}
inline bool log_tensor(const std::shared_ptr<Tensor>& tensor, const char* port_name,
IOSpec::IOType io_type) {
PROF_SCOPED_EVENT(op_->id(), event_data_logging);
const std::string unique_id{get_unique_id(op_, port_name, io_type)};
auto metadata_ptr = op_->is_metadata_enabled() ? op_->metadata() : nullptr;
for (auto& data_logger : op_->fragment()->data_loggers()) {
if (data_logger->should_log_input()) {
PROF_SCOPED_EVENT(op_->id(), event_log_tensor);
data_logger->log_tensor_data(tensor, unique_id, -1, metadata_ptr, io_type);
}
}
return true;
}
inline bool log_tensormap(const holoscan::TensorMap& tensor_map, const char* port_name,
IOSpec::IOType io_type) {
PROF_SCOPED_EVENT(op_->id(), event_data_logging);
const std::string unique_id{get_unique_id(op_, port_name, io_type)};
auto metadata_ptr = op_->is_metadata_enabled() ? op_->metadata() : nullptr;
for (auto& data_logger : op_->fragment()->data_loggers()) {
if (data_logger->should_log_input()) {
PROF_SCOPED_EVENT(op_->id(), event_log_tensormap);
data_logger->log_tensormap_data(tensor_map, unique_id, -1, metadata_ptr, io_type);
}
}
return true;
}
inline bool populate_tensor_map(const holoscan::gxf::Entity& gxf_entity,
holoscan::TensorMap& tensor_map) {
auto tensor_components_expected = gxf_entity.findAllHeap<nvidia::gxf::Tensor>();
for (const auto& gxf_tensor : tensor_components_expected.value()) {
// Do zero-copy conversion to holoscan::Tensor (as in gxf_entity.get<holoscan::Tensor>())
auto maybe_dl_ctx = (*gxf_tensor->get()).toDLManagedTensorContext();
if (!maybe_dl_ctx) {
HOLOSCAN_LOG_ERROR(
"Failed to get std::shared_ptr<DLManagedTensorContext> from nvidia::gxf::Tensor");
return false;
}
auto holoscan_tensor = std::make_shared<Tensor>(maybe_dl_ctx.value());
tensor_map.insert({gxf_tensor->name(), holoscan_tensor});
}
return true;
}
template <typename DataT>
inline bool process_received_value(std::any& value, const std::type_info& value_type,
const char* port_name, DataT& input_vector,
std::string& error_message) {
// Assume that the received data is not of type NoMessageType
// (this case should be handled by the caller)
if (value_type == typeid(NoAccessibleMessageType)) {
auto casted_value = std::any_cast<NoAccessibleMessageType>(value);
HOLOSCAN_LOG_ERROR(static_cast<std::string>(casted_value));
error_message = std::move(static_cast<std::string>(casted_value));
return false;
}
if constexpr (std::is_same_v<typename DataT::value_type, std::any>) {
input_vector.push_back(std::move(value));
} else if (value_type == typeid(std::nullptr_t)) {
handle_null_value<DataT>(input_vector);
} else {
try {
if constexpr (is_one_of_v<typename DataT::value_type, nvidia::gxf::Entity>) {
// receive_impl returns a holoscan::gxf::Entity so we need to cast it to the correct type
auto casted_value = std::any_cast<holoscan::gxf::Entity>(value);
input_vector.push_back(casted_value);
} else {
auto casted_value = std::any_cast<typename DataT::value_type>(value);
input_vector.push_back(casted_value);
}
} catch (const std::bad_any_cast& e) {
return handle_bad_any_cast<DataT>(value, port_name, input_vector, error_message);
} catch (const std::exception& e) {
error_message = fmt::format(
"Unable to cast the received data to the specified type for input '{}' of "
"type {}: {}",
port_name,
value_type.name(),
e.what());
return false;
}
}
return true;
}
template <typename DataT>
inline void handle_null_value(DataT& input_vector) {
if constexpr (holoscan::is_shared_ptr_v<typename DataT::value_type> ||
std::is_pointer_v<typename DataT::value_type>) {
input_vector.push_back(typename DataT::value_type{nullptr});
}
}
template <typename DataT>
inline bool handle_bad_any_cast(std::any& value, const char* port_name, DataT& input_vector,
std::string& error_message) {
if constexpr (is_one_of_derived_v<typename DataT::value_type, nvidia::gxf::Entity>) {
error_message = fmt::format(
"Unable to cast the received data to the specified type (holoscan::gxf::Entity) for "
"input "
"'{}'",
port_name);
HOLOSCAN_LOG_DEBUG(error_message);
return false;
} else if constexpr (is_one_of_derived_v<typename DataT::value_type, holoscan::TensorMap>) {
TensorMap tensor_map;
try {
auto gxf_entity = std::any_cast<holoscan::gxf::Entity>(value);
bool is_tensor_map_populated = populate_tensor_map(gxf_entity, tensor_map);
if (!is_tensor_map_populated) {
error_message = fmt::format(
"Unable to populate the TensorMap from the received GXF Entity for input '{}'",
port_name);
HOLOSCAN_LOG_DEBUG(error_message);
return false;
}
HOLOSCAN_LOG_TRACE("[receive] logging tensor map");
auto& data_loggers = op_->fragment()->data_loggers();
if (tensor_map.size() > 0 && !data_loggers.empty()) {
log_tensormap(tensor_map, port_name, IOSpec::IOType::kInput);
}
} catch (const std::bad_any_cast& e) {
error_message = fmt::format(
"Unable to cast the received data to the specified type (holoscan::TensorMap) for "
"input "
"'{}'",
port_name);
HOLOSCAN_LOG_DEBUG(error_message);
return false;
}
input_vector.push_back(std::move(tensor_map));
} else {
error_message = fmt::format(
"Unable to cast the received data to the specified type for input '{}' of type {}: {}",
port_name,
value.type().name(),
error_message);
HOLOSCAN_LOG_DEBUG(error_message);
return false;
}
return true;
}
template <typename DataT>
inline holoscan::expected<DataT, holoscan::RuntimeError> receive_single_value(
const char* name, InputType in_type, bool omit_tensormap_logging = false) {
bool omit_data_logging = false;
if constexpr (is_one_of_derived_v<DataT, holoscan::TensorMap>) {
omit_data_logging = true;
}
auto value = receive_impl(name, in_type, false, omit_data_logging);
const std::type_info& value_type = value.type();
if (value_type == typeid(NoMessageType)) {
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(name, "No message received from the input port"));
} else if (value_type == typeid(NoAccessibleMessageType)) {
auto casted_value = std::any_cast<NoAccessibleMessageType>(value);
HOLOSCAN_LOG_ERROR(static_cast<std::string>(casted_value));
auto error_message = static_cast<std::string>(casted_value);
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(name, error_message.c_str()));
}
try {
if constexpr (std::is_same_v<DataT, std::any>) {
return value;
} else if (value_type == typeid(std::nullptr_t)) {
return handle_null_value<DataT>();
} else if constexpr (is_one_of_derived_v<DataT, nvidia::gxf::Entity>) {
// Handle nvidia::gxf::Entity
// receive_impl returns a holoscan::gxf::Entity so we need to cast it to the correct type
return std::any_cast<holoscan::gxf::Entity>(value);
} else if constexpr (is_one_of_derived_v<DataT, holoscan::TensorMap>) {
// Handle holoscan::TensorMap
TensorMap tensor_map;
bool is_tensor_map_populated =
populate_tensor_map(std::any_cast<holoscan::gxf::Entity>(value), tensor_map);
if (!is_tensor_map_populated) {
auto error_message = fmt::format(
"Unable to populate the TensorMap from the received GXF Entity for input '{}'", name);
HOLOSCAN_LOG_DEBUG(error_message);
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(name, error_message.c_str()));
}
if (!omit_tensormap_logging) {
HOLOSCAN_LOG_TRACE("[receive] logging tensor map");
auto& data_loggers = op_->fragment()->data_loggers();
if (tensor_map.size() > 0 && !data_loggers.empty()) {
log_tensormap(tensor_map, name, IOSpec::IOType::kInput);
}
}
return tensor_map;
} else {
return std::any_cast<DataT>(value);
}
} catch (const std::bad_any_cast& e) {
auto error_message = fmt::format(
"Unable to cast the received data to the specified type for input '{}' of type {}",
name,
value.type().name());
HOLOSCAN_LOG_DEBUG(error_message);
return make_unexpected<holoscan::RuntimeError>(
create_receive_error(name, error_message.c_str()));
}
}
inline holoscan::RuntimeError create_receive_error(const char* name, const char* message) {
auto error_message =
fmt::format("Failure receiving message from input port '{}': {}", name, message);
HOLOSCAN_LOG_TRACE(error_message);
return holoscan::RuntimeError(holoscan::ErrorCode::kReceiveError, error_message.c_str());
}
template <typename DataT>
inline holoscan::expected<DataT, holoscan::RuntimeError> handle_null_value() {
if constexpr (holoscan::is_shared_ptr_v<DataT> || std::is_pointer_v<DataT>) {
return DataT{nullptr};
} else {
auto error_message = "Received nullptr for a non-pointer type";
return make_unexpected<holoscan::RuntimeError>(create_receive_error("input", error_message));
}
}
// --------------- End of helper functions for the receive method ---------------
ExecutionContext* execution_context_ =
nullptr;
Operator* op_ = nullptr;
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs_;
protected:
std::shared_ptr<CudaObjectHandler> cuda_object_handler_{};
};
class OutputContext {
public:
OutputContext(ExecutionContext* execution_context, Operator* op)
: execution_context_(execution_context), op_(op), outputs_(op->spec()->outputs()) {}
OutputContext(ExecutionContext* execution_context, Operator* op,
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs)
: execution_context_(execution_context), op_(op), outputs_(outputs) {}
virtual ~OutputContext() = default;
ExecutionContext* execution_context() const { return execution_context_; }
Operator* op() const { return op_; }
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs() const { return outputs_; }
protected:
enum class OutputType {
kGXFEntity,
kAny,
};
public:
template <typename DataT,
typename = std::enable_if_t<holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>>
void emit(DataT& data, const char* name = nullptr, const int64_t acq_timestamp = -1) {
HOLOSCAN_LOG_TRACE("OutputContext::emit (Entity DataT) for op: {}, name: {}",
op_->name(),
name == nullptr ? "nullptr" : name);
std::string output_name = holoscan::get_well_formed_name(name, outputs_);
auto output_it = outputs_.find(output_name);
std::string unique_id;
if (output_it != outputs_.end()) {
unique_id = output_it->second->unique_id();
} else {
unique_id = fmt::format("{}.{}", op_->name(), name == nullptr ? "<unknown>" : name);
}
PROF_SCOPED_PORT_EVENT(op_->id(), unique_id, event_emit::color);
// if it is the same as nvidia::gxf::Entity then just pass it to emit_impl
if constexpr (holoscan::is_one_of_v<DataT, nvidia::gxf::Entity>) {
emit_impl(data, name, OutputType::kGXFEntity, acq_timestamp);
} else {
// Convert it to nvidia::gxf::Entity and then pass it to emit_impl
// Otherwise, we will lose the type information and cannot cast appropriately in emit_impl
emit_impl(nvidia::gxf::Entity(data), name, OutputType::kGXFEntity, acq_timestamp);
}
}
template <typename DataT,
typename = std::enable_if_t<!holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>>
void emit(DataT data, const char* name = nullptr, const int64_t acq_timestamp = -1) {
HOLOSCAN_LOG_TRACE("OutputContext::emit (non-Entity DataT) for op: {}, name: {}",
op_->name(),
name == nullptr ? "nullptr" : name);
std::string output_name = holoscan::get_well_formed_name(name, outputs_);
auto output_it = outputs_.find(output_name);
std::string unique_id;
if (output_it != outputs_.end()) {
unique_id = output_it->second->unique_id();
} else {
unique_id = fmt::format("{}.{}", op_->name(), name == nullptr ? "<unknown>" : name);
}
PROF_SCOPED_PORT_EVENT(op_->id(), unique_id, event_emit::color);
emit_impl(std::move(data), name, OutputType::kAny, acq_timestamp);
}
void emit(holoscan::TensorMap& data, const char* name = nullptr,
const int64_t acq_timestamp = -1) {
HOLOSCAN_LOG_TRACE("OutputContext::emit (TensorMap) for op: {}, name: {}",
op_->name(),
name == nullptr ? "nullptr" : name);
std::string output_name = holoscan::get_well_formed_name(name, outputs_);
auto output_it = outputs_.find(output_name);
std::string unique_id;
if (output_it != outputs_.end()) {
unique_id = output_it->second->unique_id();
} else {
unique_id = fmt::format("{}.{}", op_->name(), name == nullptr ? "<unknown>" : name);
}
PROF_SCOPED_PORT_EVENT(op_->id(), unique_id, event_emit::color);
auto out_message = holoscan::gxf::Entity::New(execution_context_);
for (auto& [key, tensor] : data) {
out_message.add(tensor, key.c_str());
}
emit(out_message, name, acq_timestamp);
}
void emit(std::shared_ptr<holoscan::Tensor> data, const char* name = nullptr,
const int64_t acq_timestamp = -1) {
HOLOSCAN_LOG_TRACE(
"OutputContext::emit (std::shared_ptr<holoscan::Tensor>) for op: {}, name: {}",
op_->name(),
name == nullptr ? "nullptr" : name);
std::string output_name = holoscan::get_well_formed_name(name, outputs_);
auto output_it = outputs_.find(output_name);
std::string unique_id;
if (output_it != outputs_.end()) {
unique_id = output_it->second->unique_id();
} else {
unique_id = fmt::format("{}.{}", op_->name(), name == nullptr ? "<unknown>" : name);
}
PROF_SCOPED_PORT_EVENT(op_->id(), unique_id, event_emit::color);
auto out_message = holoscan::gxf::Entity::New(execution_context_);
out_message.add(data, "");
emit(out_message, name, acq_timestamp);
}
virtual void set_cuda_stream(const cudaStream_t stream,
const char* output_port_name = nullptr) = 0;
std::shared_ptr<CudaObjectHandler> cuda_object_handler() { return cuda_object_handler_; }
void cuda_object_handler(std::shared_ptr<CudaObjectHandler> handler) {
cuda_object_handler_ = std::move(handler);
}
protected:
virtual void emit_impl([[maybe_unused]] std::any data,
[[maybe_unused]] const char* name = nullptr,
[[maybe_unused]] OutputType out_type = OutputType::kAny,
[[maybe_unused]] const int64_t acq_timestamp = -1) {}
ExecutionContext* execution_context_ =
nullptr;
Operator* op_ = nullptr;
std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs_;
std::shared_ptr<CudaObjectHandler> cuda_object_handler_{};
};
} // namespace holoscan
#endif/* HOLOSCAN_CORE_IO_CONTEXT_HPP */