NVIDIA DeepStream SDK API Reference

7.0 Release
safe_queue.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2021-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  * SPDX-License-Identifier: LicenseRef-NvidiaProprietary
4  *
5  * NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
6  * property and proprietary rights in and to this material, related
7  * documentation and any modifications thereto. Any use, reproduction,
8  * disclosure or distribution of this material and related documentation
9  * without an express license agreement from NVIDIA CORPORATION or
10  * its affiliates is strictly prohibited.
11  */
12 
13 
14 #ifndef DS3D_COMMON_HELPER_SAFE_QUEUE_H
15 #define DS3D_COMMON_HELPER_SAFE_QUEUE_H
16 
17 #include <ds3d/common/common.h>
18 #include <ds3d/common/func_utils.h>
19 
20 #include <chrono>
21 #include <deque>
22 
23 namespace ds3d {
24 
25 template <typename T, typename Container = std::deque<T>>
26 class SafeQueue {
27  // using namespace std::chrono_literals;
28 public:
29  void push(T data)
30  {
31  std::unique_lock<std::mutex> lock(_mutex);
32  _queue.emplace_back(std::move(data));
33  _cond.notify_one();
34  }
35  T pop(uint64_t timeoutMs = 0)
36  {
37  std::unique_lock<std::mutex> lock(_mutex);
38  auto stopWait = [this]() { return _wakeupOnce || !_queue.empty(); };
39  if (!timeoutMs) {
40  _cond.wait(lock, stopWait);
41  } else {
42  using namespace std::chrono_literals;
43  if (!_cond.wait_for(lock, timeoutMs * 1ms, stopWait)) {
44  throw Exception(ErrCode::kTimeOut, "queue pop timeout");
45  }
46  }
47  if (_wakeupOnce) {
48  _wakeupOnce = false;
49  LOG_DEBUG("SafeQueue pop end on wakeup signal");
50  throw Exception(ErrCode::kLockWakeup, "queue wakedup");
51  }
52  assert(!_queue.empty());
53  T ret = std::move(*_queue.begin());
54  _queue.erase(_queue.begin());
55  return ret;
56  }
57  void wakeupOnce()
58  {
59  LOG_DEBUG("SafeQueue trigger wakeup once");
60  std::unique_lock<std::mutex> lock(_mutex);
61  _wakeupOnce = true;
62  _cond.notify_all();
63  }
64  void clear()
65  {
66  LOG_DEBUG("SafeQueue clear");
67  std::unique_lock<std::mutex> lock(_mutex);
68  _queue.clear();
69  _wakeupOnce = false;
70  }
71  size_t size()
72  {
73  std::unique_lock<std::mutex> lock(_mutex);
74  return _queue.size();
75  }
76 
77 private:
78  std::mutex _mutex;
79  std::condition_variable _cond;
80  Container _queue;
81  bool _wakeupOnce = false;
82 };
83 
84 template <class UniPtr>
85 class BufferPool : public std::enable_shared_from_this<BufferPool<UniPtr>> {
86 public:
87  using ItemType = typename UniPtr::element_type;
88  using RecylePtr = std::unique_ptr<ItemType, std::function<void(ItemType*)>>;
89  BufferPool(const std::string& name) : m_Name(name) {}
90  virtual ~BufferPool()
91  {
92  LOG_DEBUG(
93  "BufferPool: %s deleted with free buffer size:%d", m_Name.c_str(),
94  (int)m_FreeBuffers.size());
95  }
96  bool setBuffer(UniPtr buf)
97  {
98  assert(buf);
99  m_FreeBuffers.push(std::move(buf));
100  LOG_DEBUG(
101  "BufferPool: %s set buf to free, available size:%d", m_Name.c_str(),
102  (int)m_FreeBuffers.size());
103  return true;
104  }
105  uint32_t size() { return m_FreeBuffers.size(); }
106 
108  {
109  try {
110  UniPtr p = m_FreeBuffers.pop();
111  auto deleter = p.get_deleter();
112  std::weak_ptr<BufferPool<UniPtr>> poolPtr = this->shared_from_this();
113  RecylePtr recBuf(p.release(), [poolPtr, d = deleter](ItemType* buf) {
114  assert(buf);
115  UniPtr data(buf, d);
116  auto pool = poolPtr.lock();
117  if (pool) {
118  LOG_DEBUG("BufferPool: %s release a buffer", pool->m_Name.c_str());
119  pool->setBuffer(std::move(data));
120  } else {
121  LOG_DEBUG("BufferPool was deleted before buffer release, maybe application is closing.");
122  //assert(false);
123  }
124  });
125  LOG_DEBUG(
126  "BufferPool: %s acquired buffer, available free buffer left:%d", m_Name.c_str(),
127  (int)m_FreeBuffers.size());
128  return recBuf;
129  }
130  catch (...) {
131  LOG_DEBUG(
132  "BufferPool: %s acquired buffer failed, queue may be waked up", m_Name.c_str());
133  assert(false);
134  return nullptr;
135  }
136  }
137 
138 private:
139  SafeQueue<UniPtr> m_FreeBuffers;
140  const std::string m_Name;
141 };
142 
143 } // namespace ds3d
144 
145 #endif //
ds3d::BufferPool
Definition: safe_queue.h:85
LOG_DEBUG
#define LOG_DEBUG
Definition: logging.h:19
ds3d::BufferPool::acquireBuffer
RecylePtr acquireBuffer()
Definition: safe_queue.h:107
ds3d::SafeQueue::clear
void clear()
Definition: safe_queue.h:64
ds3d::SafeQueue
Definition: safe_queue.h:26
ds3d::Exception
Definition: common.h:144
ds3d::BufferPool::size
uint32_t size()
Definition: safe_queue.h:105
ds3d::BufferPool::setBuffer
bool setBuffer(UniPtr buf)
Definition: safe_queue.h:96
ds3d::ErrCode::kTimeOut
@ kTimeOut
ds3d::ErrCode::kLockWakeup
@ kLockWakeup
ds3d::SafeQueue::push
void push(T data)
Definition: safe_queue.h:29
ds3d::BufferPool::BufferPool
BufferPool(const std::string &name)
Definition: safe_queue.h:89
ds3d::SafeQueue::size
size_t size()
Definition: safe_queue.h:71
ds3d::BufferPool::RecylePtr
std::unique_ptr< ItemType, std::function< void(ItemType *)> > RecylePtr
Definition: safe_queue.h:88
common.h
ds3d::SafeQueue::wakeupOnce
void wakeupOnce()
Definition: safe_queue.h:57
ds3d::BufferPool::~BufferPool
virtual ~BufferPool()
Definition: safe_queue.h:90
ds3d::BufferPool::ItemType
typename UniPtr::element_type ItemType
Definition: safe_queue.h:87
func_utils.h
ds3d
Definition: lidar_3d_datatype.h:35
ds3d::SafeQueue::pop
T pop(uint64_t timeoutMs=0)
Definition: safe_queue.h:35