NVIDIA Holoscan SDK v2.8.0
Holoscan v2.8.0

Program Listing for File io_context.hpp

Return to documentation for file (include/holoscan/core/io_context.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef HOLOSCAN_CORE_IO_CONTEXT_HPP #define HOLOSCAN_CORE_IO_CONTEXT_HPP #include <cuda_runtime.h> #include <any> #include <map> #include <memory> #include <string> #include <string_view> #include <typeinfo> #include <unordered_map> #include <utility> #include <vector> #include <common/type_name.hpp> #include <gxf/cuda/cuda_stream.hpp> #include "./common.hpp" #include "./domain/tensor_map.hpp" #include "./errors.hpp" #include "./expected.hpp" #include "./gxf/entity.hpp" #include "./gxf/gxf_cuda.hpp" #include "./message.hpp" #include "./operator.hpp" #include "./type_traits.hpp" namespace holoscan { // To indicate that data is not available for the input port struct NoMessageType {}; constexpr NoMessageType kNoReceivedMessage; // To indicate that input port is not accessible struct NoAccessibleMessageType : public std::string { NoAccessibleMessageType() : std::string("Port is not accessible") {} explicit NoAccessibleMessageType(const std::string& message) : std::string(message) {} explicit NoAccessibleMessageType(const char* message) : std::string(message) {} explicit NoAccessibleMessageType(std::string&& message) : std::string(std::move(message)) {} }; static inline std::string get_well_formed_name( const char* name, const std::unordered_map<std::string, std::shared_ptr<IOSpec>>& io_list) { std::string well_formed_name; if (name == nullptr || name[0] == '\0') { if (io_list.size() == 1) { well_formed_name = io_list.begin()->first; } else { well_formed_name = ""; } } else { well_formed_name = name; } return well_formed_name; } class InputContext { public: InputContext(ExecutionContext* execution_context, Operator* op, std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs) : execution_context_(execution_context), op_(op), inputs_(inputs) {} InputContext(ExecutionContext* execution_context, Operator* op) : execution_context_(execution_context), op_(op), inputs_(op->spec()->inputs()) {} virtual ~InputContext() = default; ExecutionContext* execution_context() const { return execution_context_; } Operator* op() const { return op_; } std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs() const { return inputs_; } bool empty(const char* name = nullptr) { // First see if the name could be found in the inputs auto& inputs = op_->spec()->inputs(); auto it = inputs.find(std::string(name)); if (it != inputs.end()) { return empty_impl(name); } // Then see if it is in the parameters auto& params = op_->spec()->params(); auto it2 = params.find(std::string(name)); if (it2 != params.end()) { auto& param_wrapper = it2->second; auto& arg_type = param_wrapper.arg_type(); if ((arg_type.element_type() != ArgElementType::kIOSpec) || (arg_type.container_type() != ArgContainerType::kVector)) { HOLOSCAN_LOG_ERROR("Input parameter with name '{}' is not of type 'std::vector<IOSpec*>'", name); return true; } std::any& any_param = param_wrapper.value(); // Note that the type of any_param is Parameter<typeT>*, not Parameter<typeT>. auto& param = *std::any_cast<Parameter<std::vector<IOSpec*>>*>(any_param); int num_inputs = param.get().size(); for (int i = 0; i < num_inputs; ++i) { // if any of them is not empty return false if (!empty_impl(fmt::format("{}:{}", name, i).c_str())) { return false; } } return true; // all of them are empty, so return true. } HOLOSCAN_LOG_ERROR("Input port '{}' not found", name); return true; } template <typename DataT> holoscan::expected<DataT, holoscan::RuntimeError> receive(const char* name = nullptr) { auto& params = op_->spec()->params(); auto param_it = params.find(std::string(name)); if constexpr (holoscan::is_vector_v<DataT>) { DataT input_vector; std::string error_message; if (param_it != params.end()) { auto& param_wrapper = param_it->second; if (!is_valid_param_type(param_wrapper.arg_type())) { return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, "Input parameter is not of type 'std::vector<IOSpec*>'")); } if (!fill_input_vector_from_params(param_wrapper, name, input_vector, error_message)) { return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, error_message.c_str())); } } else { if (!fill_input_vector_from_inputs(name, input_vector, error_message)) { return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, error_message.c_str())); } } return input_vector; } else { return receive_single_value<DataT>(name); } } std::shared_ptr<gxf::CudaObjectHandler> cuda_object_handler() { return cuda_object_handler_; } void cuda_object_handler(std::shared_ptr<gxf::CudaObjectHandler> handler) { cuda_object_handler_ = handler; } virtual cudaStream_t receive_cuda_stream(const char* input_port_name = nullptr, bool allocate = true, bool sync_to_default = false) = 0; virtual std::vector<std::optional<cudaStream_t>> receive_cuda_streams( const char* input_port_name = nullptr) = 0; protected: virtual bool empty_impl(const char* name = nullptr) { (void)name; return true; } virtual std::any receive_impl(const char* name = nullptr, bool no_error_message = false) { (void)name; (void)no_error_message; return nullptr; } // --------------- Start of helper functions for the receive method --------------- inline bool is_valid_param_type(const ArgType& arg_type) { return (arg_type.element_type() == ArgElementType::kIOSpec) && (arg_type.container_type() == ArgContainerType::kVector); } template <typename DataT> inline bool fill_input_vector_from_params(ParameterWrapper& param_wrapper, const char* name, DataT& input_vector, std::string& error_message) { auto& param = *std::any_cast<Parameter<std::vector<IOSpec*>>*>(param_wrapper.value()); int num_inputs = param.get().size(); input_vector.reserve(num_inputs); for (int index = 0; index < num_inputs; ++index) { std::string port_name = fmt::format("{}:{}", name, index); auto value = receive_impl(port_name.c_str(), true); const std::type_info& value_type = value.type(); if (value_type == typeid(kNoReceivedMessage)) { error_message = fmt::format("No data is received from the input port with name '{}'", port_name); return false; } if (!process_received_value(value, value_type, name, index, input_vector, error_message)) { return false; } } return true; } template <typename DataT> inline bool fill_input_vector_from_inputs(const char* name, DataT& input_vector, std::string& error_message) { const auto& inputs = op_->spec()->inputs(); const auto input_it = inputs.find(std::string(name)); if (input_it == inputs.end()) { return false; } int index = 0; while (true) { auto value = receive_impl(name); const std::type_info& value_type = value.type(); if (value_type == typeid(kNoReceivedMessage)) { if (index == 0) { error_message = fmt::format("No data is received from the input port with name '{}'", name); return false; } break; } if (index == 0 && value_type == typeid(DataT)) { // If the first input is of type DataT (such as `std::vector<bool>`), then return the value // directly input_vector = std::move(std::any_cast<DataT>(value)); return true; } if (!process_received_value(value, value_type, name, index++, input_vector, error_message)) { return false; } } return true; } inline bool populate_tensor_map(const holoscan::gxf::Entity& gxf_entity, holoscan::TensorMap& tensor_map) { auto tensor_components_expected = gxf_entity.findAllHeap<nvidia::gxf::Tensor>(); for (const auto& gxf_tensor : tensor_components_expected.value()) { // Do zero-copy conversion to holoscan::Tensor (as in gxf_entity.get<holoscan::Tensor>()) auto maybe_dl_ctx = (*gxf_tensor->get()).toDLManagedTensorContext(); if (!maybe_dl_ctx) { HOLOSCAN_LOG_ERROR( "Failed to get std::shared_ptr<DLManagedTensorContext> from nvidia::gxf::Tensor"); return false; } auto holoscan_tensor = std::make_shared<Tensor>(maybe_dl_ctx.value()); tensor_map.insert({gxf_tensor->name(), holoscan_tensor}); } return true; } template <typename DataT> inline bool process_received_value(std::any& value, const std::type_info& value_type, const char* name, int index, DataT& input_vector, std::string& error_message) { bool is_bad_any_cast = false; // Assume that the received data is not of type NoMessageType // (this case should be handled by the caller) if (value_type == typeid(NoAccessibleMessageType)) { auto casted_value = std::any_cast<NoAccessibleMessageType>(value); HOLOSCAN_LOG_ERROR(static_cast<std::string>(casted_value)); error_message = std::move(static_cast<std::string>(casted_value)); return false; } if constexpr (std::is_same_v<typename DataT::value_type, std::any>) { input_vector.push_back(std::move(value)); } else if (value_type == typeid(std::nullptr_t)) { handle_null_value<DataT>(input_vector); } else { try { auto casted_value = std::any_cast<typename DataT::value_type>(value); input_vector.push_back(casted_value); } catch (const std::bad_any_cast& e) { is_bad_any_cast = true; } catch (const std::exception& e) { error_message = fmt::format( "Unable to cast the received data to the specified type for input '{}:{}' of " "type {}: {}", name, index, value_type.name(), e.what()); return false; } } if (is_bad_any_cast) { return handle_bad_any_cast<DataT>(value, name, index, input_vector, error_message); } return true; } template <typename DataT> inline void handle_null_value(DataT& input_vector) { if constexpr (holoscan::is_shared_ptr_v<typename DataT::value_type> || std::is_pointer_v<typename DataT::value_type>) { input_vector.push_back(typename DataT::value_type{nullptr}); } } template <typename DataT> inline bool handle_bad_any_cast(std::any& value, const char* name, int index, DataT& input_vector, std::string& error_message) { if constexpr (is_one_of_derived_v<typename DataT::value_type, nvidia::gxf::Entity>) { error_message = fmt::format( "Unable to cast the received data to the specified type (holoscan::gxf::Entity) for " "input " "'{}:{}'", name, index); HOLOSCAN_LOG_DEBUG(error_message); return false; } else if constexpr (is_one_of_derived_v<typename DataT::value_type, holoscan::TensorMap>) { TensorMap tensor_map; try { auto gxf_entity = std::any_cast<holoscan::gxf::Entity>(value); bool is_tensor_map_populated = populate_tensor_map(gxf_entity, tensor_map); if (!is_tensor_map_populated) { error_message = fmt::format( "Unable to populate the TensorMap from the received GXF Entity for input '{}:{}'", name, index); HOLOSCAN_LOG_DEBUG(error_message); return false; } } catch (const std::bad_any_cast& e) { error_message = fmt::format( "Unable to cast the received data to the specified type (holoscan::TensorMap) for " "input " "'{}:{}'", name, index); HOLOSCAN_LOG_DEBUG(error_message); return false; } input_vector.push_back(std::move(tensor_map)); } else { error_message = fmt::format( "Unable to cast the received data to the specified type for input '{}:{}' of type {}: {}", name, index, value.type().name(), error_message); HOLOSCAN_LOG_DEBUG(error_message); return false; } return true; } template <typename DataT> inline holoscan::expected<DataT, holoscan::RuntimeError> receive_single_value(const char* name) { auto value = receive_impl(name); const std::type_info& value_type = value.type(); if (value_type == typeid(NoMessageType)) { return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, "No message received from the input port")); } else if (value_type == typeid(NoAccessibleMessageType)) { auto casted_value = std::any_cast<NoAccessibleMessageType>(value); HOLOSCAN_LOG_ERROR(static_cast<std::string>(casted_value)); auto error_message = std::move(static_cast<std::string>(casted_value)); return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, error_message.c_str())); } try { if constexpr (std::is_same_v<DataT, std::any>) { return value; } else if (value_type == typeid(std::nullptr_t)) { return handle_null_value<DataT>(); } else if constexpr (is_one_of_derived_v<DataT, nvidia::gxf::Entity>) { // Handle nvidia::gxf::Entity return std::any_cast<DataT>(value); } else if constexpr (is_one_of_derived_v<DataT, holoscan::TensorMap>) { // Handle holoscan::TensorMap TensorMap tensor_map; bool is_tensor_map_populated = populate_tensor_map(std::any_cast<holoscan::gxf::Entity>(value), tensor_map); if (!is_tensor_map_populated) { auto error_message = fmt::format( "Unable to populate the TensorMap from the received GXF Entity for input '{}'", name); HOLOSCAN_LOG_DEBUG(error_message); return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, error_message.c_str())); } return tensor_map; } else { return std::any_cast<DataT>(value); } } catch (const std::bad_any_cast& e) { auto error_message = fmt::format( "Unable to cast the received data to the specified type for input '{}' of type {}", name, value.type().name()); HOLOSCAN_LOG_DEBUG(error_message); return make_unexpected<holoscan::RuntimeError>( create_receive_error(name, error_message.c_str())); } } inline holoscan::RuntimeError create_receive_error(const char* name, const char* message) { auto error_message = fmt::format("Failure receiving message from input port '{}': {}", name, message); HOLOSCAN_LOG_TRACE(error_message); return holoscan::RuntimeError(holoscan::ErrorCode::kReceiveError, error_message.c_str()); } template <typename DataT> inline holoscan::expected<DataT, holoscan::RuntimeError> handle_null_value() { if constexpr (holoscan::is_shared_ptr_v<DataT> || std::is_pointer_v<DataT>) { return DataT{nullptr}; } else { auto error_message = "Received nullptr for a non-pointer type"; return make_unexpected<holoscan::RuntimeError>(create_receive_error("input", error_message)); } } // --------------- End of helper functions for the receive method --------------- ExecutionContext* execution_context_ = nullptr; Operator* op_ = nullptr; std::unordered_map<std::string, std::shared_ptr<IOSpec>>& inputs_; private: std::shared_ptr<gxf::CudaObjectHandler> cuda_object_handler_{}; }; class OutputContext { public: OutputContext(ExecutionContext* execution_context, Operator* op) : execution_context_(execution_context), op_(op), outputs_(op->spec()->outputs()) {} OutputContext(ExecutionContext* execution_context, Operator* op, std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs) : execution_context_(execution_context), op_(op), outputs_(outputs) {} virtual ~OutputContext() = default; ExecutionContext* execution_context() const { return execution_context_; } Operator* op() const { return op_; } std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs() const { return outputs_; } enum class OutputType { kSharedPointer, kGXFEntity, kAny, }; template <typename DataT, typename = std::enable_if_t<!holoscan::is_one_of_derived_v< DataT, nvidia::gxf::Entity, std::any>>> void emit(std::shared_ptr<DataT>& data, const char* name = nullptr, const int64_t acq_timestamp = -1) { emit_impl(data, name, OutputType::kSharedPointer, acq_timestamp); } template <typename DataT, typename = std::enable_if_t<holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>> void emit(DataT& data, const char* name = nullptr, const int64_t acq_timestamp = -1) { // if it is the same as nvidia::gxf::Entity then just pass it to emit_impl if constexpr (holoscan::is_one_of_v<DataT, nvidia::gxf::Entity>) { emit_impl(data, name, OutputType::kGXFEntity, acq_timestamp); } else { // Convert it to nvidia::gxf::Entity and then pass it to emit_impl // Otherwise, we will lose the type information and cannot cast appropriately in emit_impl emit_impl(nvidia::gxf::Entity(data), name, OutputType::kGXFEntity, acq_timestamp); } } template <typename DataT, typename = std::enable_if_t<!holoscan::is_one_of_derived_v<DataT, nvidia::gxf::Entity>>> void emit(DataT data, const char* name = nullptr, const int64_t acq_timestamp = -1) { emit_impl(std::move(data), name, OutputType::kAny, acq_timestamp); } void emit(holoscan::TensorMap& data, const char* name = nullptr, const int64_t acq_timestamp = -1) { auto out_message = holoscan::gxf::Entity::New(execution_context_); for (auto& [key, tensor] : data) { out_message.add(tensor, key.c_str()); } emit(out_message, name, acq_timestamp); } virtual void set_cuda_stream(const cudaStream_t stream, const char* output_port_name = nullptr) = 0; std::shared_ptr<gxf::CudaObjectHandler> cuda_object_handler() { return cuda_object_handler_; } void cuda_object_handler(std::shared_ptr<gxf::CudaObjectHandler> handler) { cuda_object_handler_ = handler; } protected: virtual void emit_impl([[maybe_unused]] std::any data, [[maybe_unused]] const char* name = nullptr, [[maybe_unused]] OutputType out_type = OutputType::kSharedPointer, [[maybe_unused]] const int64_t acq_timestamp = -1) {} ExecutionContext* execution_context_ = nullptr; Operator* op_ = nullptr; std::unordered_map<std::string, std::shared_ptr<IOSpec>>& outputs_; std::shared_ptr<gxf::CudaObjectHandler> cuda_object_handler_{}; }; } // namespace holoscan #endif/* HOLOSCAN_CORE_IO_CONTEXT_HPP */

© Copyright 2022-2024, NVIDIA. Last updated on Jan 2, 2025.