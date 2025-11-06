/* * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef HOLOSCAN_CORE_RESOURCES_ASYNC_DATA_LOGGER_HPP #define HOLOSCAN_CORE_RESOURCES_ASYNC_DATA_LOGGER_HPP #include <algorithm> #include <any> #include <atomic> #include <cctype> #include <cstdint> #include <memory> #include <string> #include <thread> #include <variant> #include <vector> #include "yaml-cpp/yaml.h" #include "concurrentqueue.h" #include "holoscan/core/arg.hpp" #include "holoscan/core/component_spec.hpp" #include "holoscan/core/domain/tensor.hpp" #include "holoscan/core/domain/tensor_map.hpp" #include "holoscan/core/io_spec.hpp" #include "holoscan/core/resources/data_logger.hpp" #include "holoscan/logger/logger.hpp" namespace holoscan { class MetadataDictionary; // forward declaration enum class AsyncQueuePolicy { kReject = 0, kRaise = 1 }; } // namespace holoscan // YAML conversion support for AsyncQueuePolicy template <> struct YAML::convert<holoscan::AsyncQueuePolicy> { static Node encode(const holoscan::AsyncQueuePolicy& rhs) { Node node; switch (rhs) { case holoscan::AsyncQueuePolicy::kReject: node = "reject"; break; case holoscan::AsyncQueuePolicy::kRaise: node = "raise"; break; default: node = static_cast<int>(rhs); // fallback to numeric value break; } return node; } static bool decode(const Node& node, holoscan::AsyncQueuePolicy& rhs) { if (!node.IsScalar()) return false; const std::string value = node.Scalar(); // Support string values (case-insensitive) std::string lower_value = value; std::transform( lower_value.begin(), lower_value.end(), lower_value.begin(), [](unsigned char c) { return std::tolower(c); }); if (lower_value == "reject") { rhs = holoscan::AsyncQueuePolicy::kReject; return true; } else if (lower_value == "raise") { rhs = holoscan::AsyncQueuePolicy::kRaise; return true; } // Support using the numeric enum values as well try { int numeric_value = std::stoi(value); if (numeric_value == static_cast<int>(holoscan::AsyncQueuePolicy::kReject)) { rhs = holoscan::AsyncQueuePolicy::kReject; return true; } else if (numeric_value == static_cast<int>(holoscan::AsyncQueuePolicy::kRaise)) { rhs = holoscan::AsyncQueuePolicy::kRaise; return true; } } catch (...) { // Not a valid number, continue to return false } return false; // Invalid value } }; namespace holoscan { struct DataEntry { enum Type { Generic, TensorData, TensorMapData }; Type type; std::string unique_id; int64_t acquisition_timestamp; int64_t emit_timestamp; IOSpec::IOType io_type; std::shared_ptr<MetadataDictionary> metadata{}; std::variant<std::any, std::shared_ptr<holoscan::Tensor>, holoscan::TensorMap> data; // Default constructor DataEntry() : type(Generic), acquisition_timestamp(-1), emit_timestamp(-1), io_type(IOSpec::IOType::kOutput), data(std::any{}) {} // Constructors for different types DataEntry(std::any data_arg, const std::string& id, int64_t acq_time, int64_t emit_time, IOSpec::IOType io_type, std::shared_ptr<MetadataDictionary> meta = nullptr); DataEntry(std::shared_ptr<holoscan::Tensor> tensor, const std::string& id, int64_t acq_time, int64_t emit_time, IOSpec::IOType io_type, std::shared_ptr<MetadataDictionary> meta = nullptr); DataEntry(holoscan::TensorMap tensor_map, const std::string& id, int64_t acq_time, int64_t emit_time, IOSpec::IOType io_type, std::shared_ptr<MetadataDictionary> meta = nullptr); }; class AsyncDataLoggerBackend { public: AsyncDataLoggerBackend() = default; virtual ~AsyncDataLoggerBackend() = default; virtual bool initialize() = 0; virtual void shutdown() = 0; // Separate processing methods for different data types virtual bool process_data_entry(const DataEntry& entry) = 0; virtual bool process_large_data_entry(const DataEntry& entry) = 0; virtual std::string get_statistics() const { return ""; } }; class AsyncDataLoggerResource : public DataLoggerResource { public: HOLOSCAN_RESOURCE_FORWARD_ARGS_SUPER(AsyncDataLoggerResource, DataLoggerResource) AsyncDataLoggerResource() = default; ~AsyncDataLoggerResource() override; // holds non-joinable std::threads, so prevent copying or moving the resource AsyncDataLoggerResource(const AsyncDataLoggerResource&) = delete; AsyncDataLoggerResource& operator=(const AsyncDataLoggerResource&) = delete; AsyncDataLoggerResource(AsyncDataLoggerResource&&) = delete; AsyncDataLoggerResource& operator=(AsyncDataLoggerResource&&) = delete; void setup(ComponentSpec& spec) override; void initialize() override; // DataLogger interface implementation bool log_data(const std::any& data, const std::string& unique_id, int64_t acquisition_timestamp = -1, const std::shared_ptr<MetadataDictionary>& metadata = nullptr, IOSpec::IOType io_type = IOSpec::IOType::kOutput) override; bool log_tensor_data(const std::shared_ptr<Tensor>& tensor, const std::string& unique_id, int64_t acquisition_timestamp = -1, const std::shared_ptr<MetadataDictionary>& metadata = nullptr, IOSpec::IOType io_type = IOSpec::IOType::kOutput) override; bool log_tensormap_data(const TensorMap& tensor_map, const std::string& unique_id, int64_t acquisition_timestamp = -1, const std::shared_ptr<MetadataDictionary>& metadata = nullptr, IOSpec::IOType io_type = IOSpec::IOType::kOutput) override; bool log_backend_specific( [[maybe_unused]] const std::any& data, [[maybe_unused]] const std::string& unique_id, [[maybe_unused]] int64_t acquisition_timestamp = -1, [[maybe_unused]] const std::shared_ptr<MetadataDictionary>& metadata = nullptr, [[maybe_unused]] IOSpec::IOType io_type = IOSpec::IOType::kOutput) override { // Default implementation: backend-specific logging is not supported return false; } void shutdown() override; void set_backend(std::shared_ptr<AsyncDataLoggerBackend> backend); // Statistics methods std::string get_statistics() const; size_t get_data_dropped_count() const { return data_dropped_.load(); } size_t get_large_data_dropped_count() const { return large_data_dropped_.load(); } size_t get_data_queue_size() const; size_t get_large_data_queue_size() const; protected: bool start_worker_threads(); void stop_worker_threads(); void data_worker_function(); void large_data_worker_function(); bool enqueue_data_entry(DataEntry&& entry); bool enqueue_large_data_entry(DataEntry&& entry); template <typename ArgT> [[nodiscard]] ArgT copy_value_from_args(const std::string& arg_name, ArgT default_value) { auto arg_it = std::find_if(args().begin(), args().end(), [&arg_name](const auto& arg) { return (arg.name() == arg_name); }); if (arg_it == args().end()) { return default_value; // Argument not found } if (!arg_it->has_value()) { return default_value; // Argument has no value } std::any& any_arg = arg_it->value(); ArgT result = default_value; if (arg_it->arg_type().element_type() == ArgElementType::kYAMLNode) { // Handle YAML node try { auto& arg_value = std::any_cast<YAML::Node&>(any_arg); bool parse_ok = YAML::convert<ArgT>::decode(arg_value, result); if (!parse_ok) { HOLOSCAN_LOG_ERROR("Could not parse YAML parameter '{}' as requested type", arg_name); return default_value; } } catch (const std::exception& e) { HOLOSCAN_LOG_ERROR("Exception parsing YAML parameter '{}': {}", arg_name, e.what()); return default_value; } } else { // Handle direct value try { result = std::any_cast<ArgT>(any_arg); } catch (const std::bad_any_cast& e) { HOLOSCAN_LOG_ERROR( "Could not cast parameter '{}' to requested type: {}", arg_name, e.what()); return default_value; } } return result; } private: // Queue configuration parameters Parameter<size_t> max_queue_size_; // Default: 50,000 Parameter<int64_t> worker_sleep_time_; // Default: 50000ns (50μs) Parameter<AsyncQueuePolicy> queue_policy_; // Default: kReject Parameter<size_t> large_data_max_queue_size_; // Default: 1,000 Parameter<int64_t> large_data_worker_sleep_time_; // Default: 50000ns (50μs) Parameter<AsyncQueuePolicy> large_data_queue_policy_; // Default: kReject Parameter<bool> enable_large_data_queue_; // Default: true (enable separate queue for large data processing) // Lock-free queues std::unique_ptr<moodycamel::ConcurrentQueue<DataEntry>> data_queue_; std::unique_ptr<moodycamel::ConcurrentQueue<DataEntry>> large_data_queue_; // Worker threads std::thread data_worker_; std::thread large_data_worker_; std::atomic<bool> shutdown_requested_{false}; std::atomic<bool> workers_running_{false}; // Track backend shutdown to prevent multiple shutdown calls std::atomic<bool> backend_shutdown_called_{false}; // Statistics (separate for each queue) std::atomic<size_t> data_dropped_{0}; std::atomic<size_t> data_processed_{0}; std::atomic<size_t> data_enqueued_{0}; std::atomic<size_t> large_data_dropped_{0}; std::atomic<size_t> large_data_processed_{0}; std::atomic<size_t> large_data_enqueued_{0}; // Backend std::shared_ptr<AsyncDataLoggerBackend> backend_; std::atomic<bool> backend_initialized_{false}; // Atomic flag for backend initialization status }; } // namespace holoscan #endif/* HOLOSCAN_CORE_RESOURCES_ASYNC_DATA_LOGGER_HPP */