NVIDIA DeepStream SDK API Reference

6.4 Release
nvds3d_pipeline_context.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  * SPDX-License-Identifier: LicenseRef-NvidiaProprietary
4  *
5  * NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
6  * property and proprietary rights in and to this material, related
7  * documentation and any modifications thereto. Any use, reproduction,
8  * disclosure or distribution of this material and related documentation
9  * without an express license agreement from NVIDIA CORPORATION or
10  * its affiliates is strictly prohibited.
11  */
12 
13 
14 #ifndef NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
15 #define NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
16 
17 #include "gstnvdsmeta.h"
18 
19 // inlcude all ds3d hpp header files
24 
25 // inlucde nvds3d Gst header files
28 #include <ds3d/gst/nvds3d_meta.h>
29 #include <gst/gst.h>
30 
31 namespace ds3d { namespace gst {
32 
33 constexpr const char* kDs3dFilterPluginName = "nvds3dfilter";
34 constexpr const char* kDs3dBridgePluginName = "nvds3dbridge";
35 constexpr const char* kDs3dMixerPluginName = "nvds3dmixer";
36 
37 extern "C" {
38 
39 static gboolean
40 SendEosOnSrc(GstElement* element, GstPad* pad, gpointer user_data)
41 {
42  GstPad* peer = gst_pad_get_peer(pad);
43  if (!peer) {
45  "send EOS downstream [elem:%s] skipped; not linked\n",
46  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))));
47  return TRUE;
48  }
49  LOG_DEBUG(
50  "sending EOS downstream [elem:%s->%s]\n", GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))),
51  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(peer))));
52  if (gst_pad_send_event(peer, gst_event_new_eos()) == FALSE) {
54  "send EOS downstream [elem:%s->%s] failed\n", GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))),
55  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(peer))));
56  }
57  gst_object_unref(peer);
58  return TRUE;
59 }
60 }
61 
63 public:
65  virtual ~PipelineContext() { deinit(); }
66 
67  void setMainloop(GMainLoop* loop) { _mainLoop.reset(loop); }
68  void setEosAutoQuit(bool enable) { _eosAutoQuit = enable; }
69 
70  virtual ErrCode init(const std::string& name)
71  {
74  _pipeline.reset(gst_pipeline_new(name.c_str()));
75  DS3D_FAILED_RETURN(pipeline(), ErrCode::kGst, "create pipeline: %s failed", name.c_str());
76  _pipeline.setName(name);
77  _bus.reset(gst_pipeline_get_bus(pipeline()));
78  DS3D_FAILED_RETURN(bus(), ErrCode::kGst, "get bus from pipeline: %s failed", name.c_str());
79  _busWatchId = gst_bus_add_watch(bus(), sBusCall, this);
80  return ErrCode::kGood;
81  }
82 
84  {
87  gst_bin_add(GST_BIN(pipeline()), ele.copy()), ErrCode::kGst, "add element failed");
88  _elementList.emplace_back(ele);
89  return *this;
90  }
91 
92  virtual ErrCode start(std::function<void()> loopQuitCb)
93  {
94  LOG_DEBUG("starting");
95  DS3D_ERROR_RETURN(playPipeline(), "failed to start playing the pipeline");
97  runMainLoop(std::move(loopQuitCb)), "failed to run main loop on the pipeline");
98  return ErrCode::kGood;
99  }
100 
101  virtual ErrCode stop()
102  {
103  LOG_DEBUG("stopping");
104  if (mainLoop() && isRunning(1000)) {
105  LOG_DEBUG("start sending EOS");
106  sendEOS();
107  std::unique_lock<std::mutex> lock(mutex());
108  if (!_StatusCond.wait_for(lock, std::chrono::seconds(3), [this]() {
109  return _mainStopped || _eosReceived;
110  })) {
111  LOG_DEBUG("waiting for EOS timed out, force to stop");
112  }
113  }
114 
115  quitMainLoop();
116  waitLoopQuit();
117  return stopPipeline();
118  }
119 
120  virtual void deinit()
121  {
122  LOG_DEBUG("deinit");
123  if (bus()) {
124  gst_bus_remove_watch(bus());
125  }
126  _bus.reset();
127  _pipeline.reset();
128  _elementList.clear();
129  _mainLoop.reset();
130  }
131 
132  /* timeout: milliseconds, 0 means never timeout */
133  bool isRunning(size_t timeout = 0)
134  {
135  std::unique_lock<std::mutex> locker(mutex());
136  if (!mainLoop() || !pipeline() || _mainStopped || (_eosAutoQuit && _eosReceived)) {
137  return false;
138  }
139  if (!g_main_loop_is_running(mainLoop())) {
140  return false;
141  }
142  locker.unlock();
143 
144  GstState state = GST_STATE_NULL;
145  GstState pending = GST_STATE_NULL;
146  GstStateChangeReturn ret = gst_element_get_state(
147  GST_ELEMENT(pipeline()), &state, &pending,
148  (timeout ? timeout * 1000000 : GST_CLOCK_TIME_NONE));
149 
150  // multi-times try on get_state in case gstreamer is not maintening states well.
151  uint32_t times = 1;
152  while (ret == GST_STATE_CHANGE_FAILURE && times++ < 3) {
153  ret = gst_element_get_state(GST_ELEMENT(pipeline()), &state, &pending, 0);
154  }
155  if (ret == GST_STATE_CHANGE_FAILURE) {
156  return false;
157  }
158  if (state == GST_STATE_PLAYING || pending == GST_STATE_PLAYING) {
159  return true;
160  }
161  return false;
162  }
163 
165  {
166  std::unique_lock<std::mutex> locker(mutex());
167  if (mainLoop()) {
168  g_main_loop_quit(mainLoop());
169  }
170  _StatusCond.notify_all();
171  }
172 
174  {
175  std::unique_lock<std::mutex> locker(mutex());
176  if (mainLoop() && !_mainStopped && _mainLoopThread) {
177  if (_StatusCond.wait_for(locker, std::chrono::milliseconds(3000)) ==
178  std::cv_status::timeout) {
179  LOG_DEBUG("waiting loop timed out, force loop to stop");
180  }
181  }
182  _mainStopped = true;
183  if (_mainLoopThread) {
184  auto swapThread = std::move(_mainLoopThread);
185  _mainLoopThread.reset();
186  locker.unlock();
187  swapThread->join();
188  }
189  }
190 
192  {
194  {
195  std::unique_lock<std::mutex> locker(mutex());
196  _eosReceived = false;
197  }
198  auto c = setPipelineState(GST_STATE_PLAYING);
199  return c;
200  }
201 
203  {
204  if (!_pipeline) {
205  return ErrCode::kGood;
206  }
207  ErrCode c = setPipelineState(GST_STATE_NULL);
208  if (!isGood(c)) {
209  LOG_WARNING("set pipeline state to GST_STATE_NULL failed");
210  }
211  GstState end = GST_STATE_NULL;
212  c = getState(_pipeline.get(), &end, nullptr, 3000);
213  if (!isGood(c) || end != GST_STATE_NULL) {
214  LOG_WARNING("waiting for pipeline state to null failed, force to quit");
215  }
216  for (auto& each : _elementList) {
217  if (each) {
218  c = setState(each.get(), GST_STATE_NULL);
219  }
220  }
221  return c;
222  }
223 
224  static gboolean GSourceCb(gpointer user_data)
225  {
226  std::function<bool()>* f = (std::function<bool()>*)(user_data);
227  DS_ASSERT(f);
228  return (*f)();
229  }
230 
231  ErrCode runMainLoop(std::function<void()> loopQuitCb)
232  {
233  std::unique_lock<std::mutex> locker(mutex());
236  "failed to run main loop due to loop might not set or thread already running.");
237 
238  _mainStopped = false;
239  auto loopThread = std::make_unique<std::thread>([this, quitCb = std::move(loopQuitCb)]() {
240  g_main_loop_run(mainLoop());
241  quitCb();
242  std::unique_lock<std::mutex> locker(mutex());
243  _mainStopped = true;
244  _StatusCond.notify_all();
245  });
246  DS_ASSERT(loopThread);
247 
248  // check g_main_loop_run started
249  std::atomic_bool loopStarted{false};
250  std::function<bool()> loopCheck = [&loopStarted, this]() -> bool {
251  std::unique_lock<std::mutex> locker(mutex());
252  loopStarted = true;
253  _StatusCond.notify_all();
254  return false;
255  };
256  g_idle_add(GSourceCb, &loopCheck);
257 
258  // TODO, find better timeout
259  // set a larger timeout value since some model load taking long time
260  if (!_StatusCond.wait_for(locker, std::chrono::milliseconds(20000), [this, &loopStarted]() {
261  return loopStarted || _mainStopped;
262  })) {
263  locker.unlock();
264  LOG_WARNING("Starting main loop timed out");
265  quitMainLoop();
266  loopThread->join();
267  return ErrCode::kTimeOut;
268  }
269 
270  // run main loop stopped too fast
271  if (_mainStopped) {
272  LOG_ERROR("Run main loop stopped too fast, please check.");
273  locker.unlock();
274  loopThread->join();
275  return ErrCode::kUnknown;
276  }
277  _mainLoopThread = std::move(loopThread);
278 
279  return ErrCode::kGood;
280  }
281 
283  {
284  LOG_INFO("pipeline sending EOS");
285  GstIterator* itr = nullptr;
286  GValue data = {
287  0,
288  };
289  for (itr = gst_bin_iterate_sources(GST_BIN(pipeline())); gst_iterator_next(itr, &data) == GST_ITERATOR_OK;) {
290  GstElement* elem = GST_ELEMENT_CAST(g_value_get_object(&data));
291  LOG_DEBUG("sending EOS downstream from src element %s\n", GST_ELEMENT_NAME(elem));
292  // operating pads directly might lose element's state lock, each element's function must
293  // be thread-safe
294  gst_element_foreach_src_pad(elem, SendEosOnSrc, NULL);
295  g_value_reset(&data);
296  }
297  g_value_unset(&data);
298  gst_iterator_free(itr);
299 
300  return ErrCode::kGood;
301  }
302 
303  GstPipeline* pipeline() const { return GST_PIPELINE_CAST(_pipeline.get()); }
304  GstBus* bus() const { return _bus.get(); }
305  GMainLoop* mainLoop() const { return _mainLoop.get(); }
306 
307 private:
308  // default bus callback
309  virtual bool busCall(GstMessage* msg)
310  {
311  DS_ASSERT(mainLoop());
312  switch (GST_MESSAGE_TYPE(msg)) {
313  case GST_MESSAGE_EOS:
314  LOG_INFO("End of stream received");
315  {
316  std::unique_lock<std::mutex> locker(mutex());
317  _eosReceived = true;
318  _StatusCond.notify_all();
319  }
320  if (_eosAutoQuit) {
321  quitMainLoop();
322  }
323  break;
324  case GST_MESSAGE_ERROR: {
325  gchar* debug = nullptr;
326  GError* error = nullptr;
327  gst_message_parse_error(msg, &error, &debug);
328  LOG_ERROR(
329  "ERROR from element %s: %s, details: %s", GST_OBJECT_NAME(msg->src), error->message,
330  (debug ? debug : ""));
331  g_free(debug);
332  g_error_free(error);
333 
334  quitMainLoop();
335  break;
336  }
337  case GST_MESSAGE_STATE_CHANGED: {
338  GstState oldState, newState, pendingState;
339 
340  gst_message_parse_state_changed(msg, &oldState, &newState, &pendingState);
341  LOG_DEBUG(
342  "Element %s changed state from %s to %s, pending: %s.", GST_OBJECT_NAME(msg->src),
343  gst_element_state_get_name(oldState), gst_element_state_get_name(newState),
344  gst_element_state_get_name(pendingState));
345  break;
346  }
347  default:
348  break;
349  }
350  return TRUE;
351  }
352 
353 protected:
354  ErrCode setPipelineState(GstState state)
355  {
357  return setState(_pipeline.get(), state);
358  }
359 
360  ErrCode setState(GstElement* ele, GstState state)
361  {
362  DS_ASSERT(ele);
363  GstStateChangeReturn ret = gst_element_set_state(ele, state);
365  ret != GST_STATE_CHANGE_FAILURE, ErrCode::kGst, "element set state: %d failed", state);
366  return ErrCode::kGood;
367  }
368 
369  /* get element states. timeout in milliseconds.
370  */
372  GstElement* ele, GstState* state, GstState* pending = nullptr, size_t timeout = 0)
373  {
374  DS_ASSERT(ele);
375  GstStateChangeReturn ret = gst_element_get_state(
376  ele, state, pending, (timeout ? timeout * 1000000 : GST_CLOCK_TIME_NONE));
377  switch (ret) {
378  case GST_STATE_CHANGE_FAILURE:
379  return ErrCode::kGst;
380  case GST_STATE_CHANGE_SUCCESS:
381  case GST_STATE_CHANGE_NO_PREROLL:
382  return ErrCode::kGood;
383  default:
384  return ErrCode::kUnknown;
385  }
386  return ErrCode::kGood;
387  }
388 
389  static gboolean sBusCall(GstBus* bus, GstMessage* msg, gpointer data)
390  {
391  PipelineContext* ctx = static_cast<PipelineContext*>(data);
392  DS_ASSERT(ctx->bus() == bus);
393  return ctx->busCall(msg);
394  }
395 
396  std::mutex& mutex() const { return _pipelineMutex; }
397 
398  // members
401  uint32_t _busWatchId = 0;
402  std::vector<gst::ElePtr> _elementList;
403  ds3d::UniqPtr<GMainLoop> _mainLoop{nullptr, g_main_loop_unref};
404  bool _eosAutoQuit = false;
405  std::unique_ptr<std::thread> _mainLoopThread;
406  std::atomic_bool _mainStopped{false};
407  std::atomic_bool _eosReceived{false};
408  mutable std::mutex _pipelineMutex;
409  std::condition_variable _StatusCond;
411 };
412 
413 }} // namespace ds3d::gst
414 
415 #endif // NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
ds3d::isGood
bool isGood(ErrCode c)
Definition: func_utils.h:28
yaml_config.hpp
ds3d::gst::PipelineContext::stopPipeline
ErrCode stopPipeline()
Definition: nvds3d_pipeline_context.h:202
ds3d::gst::PipelineContext::setMainloop
void setMainloop(GMainLoop *loop)
Definition: nvds3d_pipeline_context.h:67
ds3d::gst::PipelineContext::setEosAutoQuit
void setEosAutoQuit(bool enable)
Definition: nvds3d_pipeline_context.h:68
ds3d::gst::PipelineContext::playPipeline
ErrCode playPipeline()
Definition: nvds3d_pipeline_context.h:191
ds3d::gst::PipelineContext::waitLoopQuit
void waitLoopQuit()
Definition: nvds3d_pipeline_context.h:173
nvds3d_gst_plugin.h
ds3d::gst::PipelineContext::setState
ErrCode setState(GstElement *ele, GstState state)
Definition: nvds3d_pipeline_context.h:360
ds3d::UniqPtr
std::unique_ptr< T, std::function< void(T *)> > UniqPtr
Definition: obj.hpp:31
DS_ASSERT
#define DS_ASSERT(...)
Definition: defines.h:31
ds3d::gst::PipelineContext::_StatusCond
std::condition_variable _StatusCond
Definition: nvds3d_pipeline_context.h:409
LOG_DEBUG
#define LOG_DEBUG
Definition: logging.h:20
ds3d::gst::PipelineContext::stop
virtual ErrCode stop()
Definition: nvds3d_pipeline_context.h:101
LOG_WARNING
#define LOG_WARNING
Definition: logging.h:18
ds3d::ErrCode::kGst
@ kGst
ds3d::gst::PipelineContext::_mainLoop
ds3d::UniqPtr< GMainLoop > _mainLoop
Definition: nvds3d_pipeline_context.h:403
ds3d::gst::PipelineContext::mainLoop
GMainLoop * mainLoop() const
Definition: nvds3d_pipeline_context.h:305
ds3d::gst::GstPtr
Definition: nvds3d_gst_ptr.h:54
ds3d::gst::SendEosOnSrc
static gboolean SendEosOnSrc(GstElement *element, GstPad *pad, gpointer user_data)
Definition: nvds3d_pipeline_context.h:40
ds3d::gst::kDs3dMixerPluginName
constexpr const char * kDs3dMixerPluginName
Definition: nvds3d_pipeline_context.h:35
ds3d::gst::PipelineContext::_pipeline
gst::ElePtr _pipeline
Definition: nvds3d_pipeline_context.h:399
ds3d::gst::PipelineContext::DS3D_DISABLE_CLASS_COPY
DS3D_DISABLE_CLASS_COPY(PipelineContext)
ds3d::ErrCode::kGood
@ kGood
ds3d::gst::PipelineContext::sBusCall
static gboolean sBusCall(GstBus *bus, GstMessage *msg, gpointer data)
Definition: nvds3d_pipeline_context.h:389
ds3d::gst::PipelineContext::mutex
std::mutex & mutex() const
Definition: nvds3d_pipeline_context.h:396
dataloader.hpp
ds3d::gst::ElePtr
Definition: nvds3d_gst_ptr.h:150
ds3d::gst::PipelineContext::~PipelineContext
virtual ~PipelineContext()
Definition: nvds3d_pipeline_context.h:65
ds3d::gst::kDs3dBridgePluginName
constexpr const char * kDs3dBridgePluginName
Definition: nvds3d_pipeline_context.h:34
ds3d::gst::PipelineContext::isRunning
bool isRunning(size_t timeout=0)
Definition: nvds3d_pipeline_context.h:133
ds3d::gst::PipelineContext::runMainLoop
ErrCode runMainLoop(std::function< void()> loopQuitCb)
Definition: nvds3d_pipeline_context.h:231
ds3d::gst::PipelineContext::start
virtual ErrCode start(std::function< void()> loopQuitCb)
Definition: nvds3d_pipeline_context.h:92
ds3d::gst::PipelineContext::_mainStopped
std::atomic_bool _mainStopped
Definition: nvds3d_pipeline_context.h:406
ds3d::ErrCode
ErrCode
Definition: common.h:43
nvds3d_gst_ptr.h
ds3d::ErrCode::kTimeOut
@ kTimeOut
datamap.hpp
ds3d::gst::GstPtr::setName
void setName(const std::string &name)
Definition: nvds3d_gst_ptr.h:68
ds3d::gst::PipelineContext::init
virtual ErrCode init(const std::string &name)
Definition: nvds3d_pipeline_context.h:70
ds3d::gst::kDs3dFilterPluginName
constexpr const char * kDs3dFilterPluginName
Definition: nvds3d_pipeline_context.h:33
frame.hpp
ds3d::gst::PipelineContext::_busWatchId
uint32_t _busWatchId
Definition: nvds3d_pipeline_context.h:401
ds3d::gst::PipelineContext::bus
GstBus * bus() const
Definition: nvds3d_pipeline_context.h:304
ds3d::gst::PipelineContext::add
PipelineContext & add(const gst::ElePtr &ele)
Definition: nvds3d_pipeline_context.h:83
gstnvdsmeta.h
ds3d::gst::GstPtr::copy
GstObjT * copy() const
Definition: nvds3d_gst_ptr.h:98
ds3d::gst::PipelineContext::setPipelineState
ErrCode setPipelineState(GstState state)
Definition: nvds3d_pipeline_context.h:354
ds3d::gst::GstPtr::get
GstObjT * get() const
Definition: nvds3d_gst_ptr.h:110
ds3d::gst::PipelineContext::pipeline
GstPipeline * pipeline() const
Definition: nvds3d_pipeline_context.h:303
ds3d::gst::PipelineContext::getState
ErrCode getState(GstElement *ele, GstState *state, GstState *pending=nullptr, size_t timeout=0)
Definition: nvds3d_pipeline_context.h:371
LOG_ERROR
#define LOG_ERROR
Copyright (c) 2019, NVIDIA CORPORATION.
Definition: logging.h:17
DS3D_ERROR_RETURN
#define DS3D_ERROR_RETURN(code, fmt,...)
Definition: defines.h:72
ds3d::gst::PipelineContext
Definition: nvds3d_pipeline_context.h:62
nvds3d_meta.h
LOG_INFO
#define LOG_INFO
Definition: logging.h:19
DS3D_THROW_ERROR
#define DS3D_THROW_ERROR(statement, code, msg)
Definition: defines.h:78
ds3d::gst::PipelineContext::_eosReceived
std::atomic_bool _eosReceived
Definition: nvds3d_pipeline_context.h:407
ds3d::gst::PipelineContext::_mainLoopThread
std::unique_ptr< std::thread > _mainLoopThread
Definition: nvds3d_pipeline_context.h:405
ds3d::gst::PipelineContext::GSourceCb
static gboolean GSourceCb(gpointer user_data)
Definition: nvds3d_pipeline_context.h:224
ds3d::ErrCode::kUnknown
@ kUnknown
ds3d::gst::PipelineContext::PipelineContext
PipelineContext()
Definition: nvds3d_pipeline_context.h:64
ds3d::gst::PipelineContext::sendEOS
ErrCode sendEOS()
Definition: nvds3d_pipeline_context.h:282
DS3D_FAILED_RETURN
#define DS3D_FAILED_RETURN(condition, ret, fmt,...)
Definition: defines.h:64
ds3d::gst::PipelineContext::_bus
gst::BusPtr _bus
Definition: nvds3d_pipeline_context.h:400
ds3d::gst::PipelineContext::_pipelineMutex
std::mutex _pipelineMutex
Definition: nvds3d_pipeline_context.h:408
ds3d::gst::PipelineContext::deinit
virtual void deinit()
Definition: nvds3d_pipeline_context.h:120
ds3d
Definition: lidar_3d_datatype.h:33
ds3d::gst::GstPtr::reset
void reset(GstObjT *obj=nullptr, bool takeOwner=true)
Definition: nvds3d_gst_ptr.h:89
ds3d::gst::PipelineContext::_eosAutoQuit
bool _eosAutoQuit
Definition: nvds3d_pipeline_context.h:404
ds3d::gst::PipelineContext::_elementList
std::vector< gst::ElePtr > _elementList
Definition: nvds3d_pipeline_context.h:402
ds3d::gst::PipelineContext::quitMainLoop
void quitMainLoop()
Definition: nvds3d_pipeline_context.h:164