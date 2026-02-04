/* * SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef HOLOSCAN_CORE_RESOURCES_DATA_LOGGER_QUEUE_HPP #define HOLOSCAN_CORE_RESOURCES_DATA_LOGGER_QUEUE_HPP #include <algorithm> #include <cctype> #include <memory> #include <mutex> #include <optional> #include <queue> #include <stdexcept> #include <string> #include <utility> #include "concurrentqueue.h" #include "yaml-cpp/yaml.h" namespace holoscan { enum class DataLoggerQueueType { LockFree, Ordered }; template <typename T> class DataLoggerQueue { public: virtual ~DataLoggerQueue() = default; virtual bool try_enqueue(T&& item) = 0; virtual bool try_dequeue(T& item) = 0; virtual size_t size_approx() const = 0; }; template <typename T> class LockFreeQueue : public DataLoggerQueue<T> { public: explicit LockFreeQueue(size_t capacity) : queue_(capacity) {} bool try_enqueue(T&& item) override { return queue_.try_enqueue(std::move(item)); } bool try_dequeue(T& item) override { return queue_.try_dequeue(item); } size_t size_approx() const override { return queue_.size_approx(); } private: moodycamel::ConcurrentQueue<T> queue_; }; template <typename T> class OrderedQueue : public DataLoggerQueue<T> { public: explicit OrderedQueue(size_t capacity) : capacity_(capacity) {} bool try_enqueue(T&& item) override { std::lock_guard<std::mutex> lock(mutex_); if (queue_.size() >= capacity_) { return false; // Queue full } queue_.push(std::move(item)); return true; } bool try_dequeue(T& item) override { // std::optional is used instead of a default-constructed temporary to avoid: // - Requiring T to have a default constructor // - Paying the cost of default construction for types with non-trivial constructors std::optional<T> deferred_item; { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } // Extract item into temporary storage while holding mutex deferred_item = std::move(queue_.front()); queue_.pop(); } // Mutex released here // Assign to output parameter OUTSIDE critical section. // Destructor of old 'item' value (containing potential GILGuardedPyObject from // previous iteration) runs here without holding the mutex, preventing deadlock. item = std::move(*deferred_item); return true; } size_t size_approx() const override { std::lock_guard<std::mutex> lock(mutex_); return queue_.size(); } private: std::queue<T> queue_; mutable std::mutex mutex_; const size_t capacity_; }; template <typename T> inline std::unique_ptr<DataLoggerQueue<T>> create_data_logger_queue(DataLoggerQueueType type, size_t capacity) { switch (type) { case DataLoggerQueueType::LockFree: return std::make_unique<LockFreeQueue<T>>(capacity); case DataLoggerQueueType::Ordered: return std::make_unique<OrderedQueue<T>>(capacity); default: throw std::runtime_error("Unknown queue type"); } } } // namespace holoscan // YAML converter for DataLoggerQueueType enum namespace YAML { template <> struct convert<holoscan::DataLoggerQueueType> { static Node encode(const holoscan::DataLoggerQueueType& rhs) { Node node; switch (rhs) { case holoscan::DataLoggerQueueType::LockFree: node = "LockFree"; break; case holoscan::DataLoggerQueueType::Ordered: node = "Ordered"; break; default: throw std::runtime_error("Unknown DataLoggerQueueType enum value"); } return node; } static bool decode(const Node& node, holoscan::DataLoggerQueueType& rhs) { if (!node.IsScalar()) { return false; } std::string value = node.as<std::string>(); // Convert to lowercase for case-insensitive comparison (avoid UB on signed char) std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { return static_cast<char>(std::tolower(c)); }); if (value == "lockfree" || value == "lock_free") { rhs = holoscan::DataLoggerQueueType::LockFree; return true; } else if (value == "ordered") { rhs = holoscan::DataLoggerQueueType::Ordered; return true; } return false; } }; } // namespace YAML #endif// HOLOSCAN_CORE_RESOURCES_DATA_LOGGER_QUEUE_HPP