14 #ifndef DS3D_COMMON_HELPER_SAFE_QUEUE_H
15 #define DS3D_COMMON_HELPER_SAFE_QUEUE_H
25 template <
typename T,
typename Container = std::deque<T>>
31 std::unique_lock<std::mutex> lock(_mutex);
32 _queue.emplace_back(std::move(data));
35 T
pop(uint64_t timeoutMs = 0)
37 std::unique_lock<std::mutex> lock(_mutex);
38 auto stopWait = [
this]() {
return _wakeupOnce || !_queue.empty(); };
40 _cond.wait(lock, stopWait);
42 using namespace std::chrono_literals;
43 if (!_cond.wait_for(lock, timeoutMs * 1ms, stopWait)) {
49 LOG_DEBUG(
"SafeQueue pop end on wakeup signal");
52 assert(!_queue.empty());
53 T ret = std::move(*_queue.begin());
54 _queue.erase(_queue.begin());
59 LOG_DEBUG(
"SafeQueue trigger wakeup once");
60 std::unique_lock<std::mutex> lock(_mutex);
67 std::unique_lock<std::mutex> lock(_mutex);
73 std::unique_lock<std::mutex> lock(_mutex);
79 std::condition_variable _cond;
81 bool _wakeupOnce =
false;
84 template <
class UniPtr>
85 class BufferPool :
public std::enable_shared_from_this<BufferPool<UniPtr>> {
87 using ItemType =
typename UniPtr::element_type;
93 "BufferPool: %s deleted with free buffer size:%d", m_Name.c_str(),
94 (
int)m_FreeBuffers.size());
99 m_FreeBuffers.push(std::move(buf));
101 "BufferPool: %s set buf to free, available size:%d", m_Name.c_str(),
102 (
int)m_FreeBuffers.size());
105 uint32_t
size() {
return m_FreeBuffers.size(); }
110 UniPtr p = m_FreeBuffers.pop();
111 auto deleter = p.get_deleter();
112 std::weak_ptr<BufferPool<UniPtr>> poolPtr = this->shared_from_this();
116 auto pool = poolPtr.lock();
118 LOG_DEBUG(
"BufferPool: %s release a buffer", pool->m_Name.c_str());
119 pool->setBuffer(std::move(data));
121 LOG_DEBUG(
"BufferPool was deleted before buffer release, maybe application is closing.");
126 "BufferPool: %s acquired buffer, available free buffer left:%d", m_Name.c_str(),
127 (
int)m_FreeBuffers.size());
132 "BufferPool: %s acquired buffer failed, queue may be waked up", m_Name.c_str());
139 SafeQueue<UniPtr> m_FreeBuffers;
140 const std::string m_Name;