NVIDIA DeepStream SDK API Reference

7.0 Release
nvds3d_pipeline_context.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: Copyright (c) 2023-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3  * SPDX-License-Identifier: LicenseRef-NvidiaProprietary
4  *
5  * NVIDIA CORPORATION, its affiliates and licensors retain all intellectual
6  * property and proprietary rights in and to this material, related
7  * documentation and any modifications thereto. Any use, reproduction,
8  * disclosure or distribution of this material and related documentation
9  * without an express license agreement from NVIDIA CORPORATION or
10  * its affiliates is strictly prohibited.
11  */
12 
13 
14 #ifndef NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
15 #define NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
16 
17 #include "gstnvdsmeta.h"
18 
19 // inlcude all ds3d hpp header files
24 
25 // inlucde nvds3d Gst header files
28 #include <ds3d/gst/nvds3d_meta.h>
29 #include <gst/gst.h>
30 
31 namespace ds3d { namespace gst {
32 
33 constexpr const char* kDs3dFilterPluginName = "nvds3dfilter";
34 constexpr const char* kDs3dBridgePluginName = "nvds3dbridge";
35 constexpr const char* kDs3dMixerPluginName = "nvds3dmixer";
36 
37 extern "C" {
38 
39 static gboolean
40 SendEosOnSrc(GstElement* element, GstPad* pad, gpointer user_data)
41 {
42  GstPad* peer = gst_pad_get_peer(pad);
43  if (!peer) {
45  "send EOS downstream [elem:%s] skipped; not linked\n",
46  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))));
47  return TRUE;
48  }
49  LOG_DEBUG(
50  "sending EOS downstream [elem:%s->%s]\n", GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))),
51  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(peer))));
52  if (gst_pad_send_event(peer, gst_event_new_eos()) == FALSE) {
54  "send EOS downstream [elem:%s->%s] failed\n", GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(pad))),
55  GST_ELEMENT_NAME(GST_ELEMENT(gst_pad_get_parent(peer))));
56  }
57  gst_object_unref(peer);
58  return TRUE;
59 }
60 }
61 
63 public:
65  virtual ~PipelineContext() { deinit(); }
66 
67  void setMainloop(GMainLoop* loop) { _mainLoop.reset(loop); }
68  void setEosAutoQuit(bool enable) { _eosAutoQuit = enable; }
69 
70  virtual ErrCode init(const std::string& name)
71  {
74  _pipeline.reset(gst_pipeline_new(name.c_str()));
75  DS3D_FAILED_RETURN(pipeline(), ErrCode::kGst, "create pipeline: %s failed", name.c_str());
76  _pipeline.setName(name);
77  _bus.reset(gst_pipeline_get_bus(pipeline()));
78  DS3D_FAILED_RETURN(bus(), ErrCode::kGst, "get bus from pipeline: %s failed", name.c_str());
79  _busWatchId = gst_bus_add_watch(bus(), sBusCall, this);
80  return ErrCode::kGood;
81  }
82 
84  {
87  gst_bin_add(GST_BIN(pipeline()), ele.copy()), ErrCode::kGst, "add element failed");
88  _elementList.emplace_back(ele);
89  return *this;
90  }
91 
92  virtual ErrCode start(std::function<void()> loopQuitCb)
93  {
94  LOG_DEBUG("starting");
95  DS3D_ERROR_RETURN(playPipeline(), "failed to start playing the pipeline");
97  runMainLoop(std::move(loopQuitCb)), "failed to run main loop on the pipeline");
98  return ErrCode::kGood;
99  }
100 
101  virtual ErrCode stop()
102  {
103  LOG_DEBUG("stopping");
104  if (mainLoop() && isRunning(1000)) {
105  LOG_DEBUG("start sending EOS");
106  sendEOS();
107  std::unique_lock<std::mutex> lock(mutex());
108  if (!_StatusCond.wait_for(lock, std::chrono::seconds(3), [this]() {
109  return _mainStopped || _eosReceived;
110  })) {
111  LOG_DEBUG("waiting for EOS timed out, force to stop");
112  }
113  }
114 
115  quitMainLoop();
116  waitLoopQuit();
117  return stopPipeline();
118  }
119 
120  virtual void deinit()
121  {
122  LOG_DEBUG("deinit");
123  if (bus()) {
124  gst_bus_remove_watch(bus());
125  }
126  _bus.reset();
127  _pipeline.reset();
128  _mainLoop.reset();
129  std::for_each(_elementList.rbegin(), _elementList.rend(), [](auto& e) { e.reset(); });
130  _elementList.clear();
131  }
132 
133  /* timeout: milliseconds, 0 means never timeout */
134  bool isRunning(size_t timeout = 0)
135  {
136  std::unique_lock<std::mutex> locker(mutex());
137  if (!mainLoop() || !pipeline() || _mainStopped || (_eosAutoQuit && _eosReceived)) {
138  return false;
139  }
140  if (!g_main_loop_is_running(mainLoop())) {
141  return false;
142  }
143  locker.unlock();
144 
145  GstState state = GST_STATE_NULL;
146  GstState pending = GST_STATE_NULL;
147  GstStateChangeReturn ret = gst_element_get_state(
148  GST_ELEMENT(pipeline()), &state, &pending,
149  (timeout ? timeout * 1000000 : GST_CLOCK_TIME_NONE));
150 
151  // multi-times try on get_state in case gstreamer is not maintening states well.
152  uint32_t times = 1;
153  while (ret == GST_STATE_CHANGE_FAILURE && times++ < 3) {
154  ret = gst_element_get_state(GST_ELEMENT(pipeline()), &state, &pending, 0);
155  }
156  if (ret == GST_STATE_CHANGE_FAILURE) {
157  return false;
158  }
159  if (state == GST_STATE_PLAYING || pending == GST_STATE_PLAYING) {
160  return true;
161  }
162  return false;
163  }
164 
166  {
167  std::unique_lock<std::mutex> locker(mutex());
168  if (mainLoop()) {
169  g_main_loop_quit(mainLoop());
170  }
171  _StatusCond.notify_all();
172  }
173 
175  {
176  std::unique_lock<std::mutex> locker(mutex());
177  if (mainLoop() && !_mainStopped && _mainLoopThread) {
178  if (_StatusCond.wait_for(locker, std::chrono::milliseconds(3000)) ==
179  std::cv_status::timeout) {
180  LOG_DEBUG("waiting loop timed out, force loop to stop");
181  }
182  }
183  _mainStopped = true;
184  if (_mainLoopThread) {
185  auto swapThread = std::move(_mainLoopThread);
186  _mainLoopThread.reset();
187  locker.unlock();
188  swapThread->join();
189  }
190  }
191 
193  {
195  {
196  std::unique_lock<std::mutex> locker(mutex());
197  _eosReceived = false;
198  }
199  auto c = setPipelineState(GST_STATE_PLAYING);
200  return c;
201  }
202 
204  {
205  if (!_pipeline) {
206  return ErrCode::kGood;
207  }
208  ErrCode c = setPipelineState(GST_STATE_NULL);
209  if (!isGood(c)) {
210  LOG_WARNING("set pipeline state to GST_STATE_NULL failed");
211  }
212  GstState end = GST_STATE_NULL;
213  c = getState(_pipeline.get(), &end, nullptr, 3000);
214  if (!isGood(c) || end != GST_STATE_NULL) {
215  LOG_WARNING("waiting for pipeline state to null failed, force to quit");
216  }
217  for (auto& each : _elementList) {
218  if (each) {
219  c = setState(each.get(), GST_STATE_NULL);
220  }
221  }
222  return c;
223  }
224 
225  static gboolean GSourceCb(gpointer user_data)
226  {
227  std::function<bool()>* f = (std::function<bool()>*)(user_data);
228  DS_ASSERT(f);
229  return (*f)();
230  }
231 
232  ErrCode runMainLoop(std::function<void()> loopQuitCb)
233  {
234  std::unique_lock<std::mutex> locker(mutex());
237  "failed to run main loop due to loop might not set or thread already running.");
238 
239  _mainStopped = false;
240  auto loopThread = std::make_unique<std::thread>([this, quitCb = std::move(loopQuitCb)]() {
241  g_main_loop_run(mainLoop());
242  quitCb();
243  std::unique_lock<std::mutex> locker(mutex());
244  _mainStopped = true;
245  _StatusCond.notify_all();
246  });
247  DS_ASSERT(loopThread);
248 
249  // check g_main_loop_run started
250  std::atomic_bool loopStarted{false};
251  std::function<bool()> loopCheck = [&loopStarted, this]() -> bool {
252  std::unique_lock<std::mutex> locker(mutex());
253  loopStarted = true;
254  _StatusCond.notify_all();
255  return false;
256  };
257  g_idle_add(GSourceCb, &loopCheck);
258 
259  // TODO, find better timeout
260  // set a larger timeout value since some model load taking long time
261  if (!_StatusCond.wait_for(locker, std::chrono::milliseconds(20000), [this, &loopStarted]() {
262  return loopStarted || _mainStopped;
263  })) {
264  locker.unlock();
265  LOG_WARNING("Starting main loop timed out");
266  quitMainLoop();
267  loopThread->join();
268  return ErrCode::kTimeOut;
269  }
270 
271  // run main loop stopped too fast
272  if (_mainStopped) {
273  LOG_ERROR("Run main loop stopped too fast, please check.");
274  locker.unlock();
275  loopThread->join();
276  return ErrCode::kUnknown;
277  }
278  _mainLoopThread = std::move(loopThread);
279 
280  return ErrCode::kGood;
281  }
282 
284  {
285  LOG_INFO("pipeline sending EOS");
286  GstIterator* itr = nullptr;
287  GValue data = {
288  0,
289  };
290  for (itr = gst_bin_iterate_sources(GST_BIN(pipeline())); gst_iterator_next(itr, &data) == GST_ITERATOR_OK;) {
291  GstElement* elem = GST_ELEMENT_CAST(g_value_get_object(&data));
292  LOG_DEBUG("sending EOS downstream from src element %s\n", GST_ELEMENT_NAME(elem));
293  // operating pads directly might lose element's state lock, each element's function must
294  // be thread-safe
295  gst_element_foreach_src_pad(elem, SendEosOnSrc, NULL);
296  g_value_reset(&data);
297  }
298  g_value_unset(&data);
299  gst_iterator_free(itr);
300 
301  return ErrCode::kGood;
302  }
303 
304  GstPipeline* pipeline() const { return GST_PIPELINE_CAST(_pipeline.get()); }
305  GstBus* bus() const { return _bus.get(); }
306  GMainLoop* mainLoop() const { return _mainLoop.get(); }
307 
308 private:
309  // default bus callback
310  virtual bool busCall(GstMessage* msg)
311  {
312  DS_ASSERT(mainLoop());
313  switch (GST_MESSAGE_TYPE(msg)) {
314  case GST_MESSAGE_EOS:
315  LOG_INFO("End of stream received");
316  {
317  std::unique_lock<std::mutex> locker(mutex());
318  _eosReceived = true;
319  _StatusCond.notify_all();
320  }
321  if (_eosAutoQuit) {
322  quitMainLoop();
323  }
324  break;
325  case GST_MESSAGE_ERROR: {
326  gchar* debug = nullptr;
327  GError* error = nullptr;
328  gst_message_parse_error(msg, &error, &debug);
329  LOG_ERROR(
330  "ERROR from element %s: %s, details: %s", GST_OBJECT_NAME(msg->src), error->message,
331  (debug ? debug : ""));
332  g_free(debug);
333  g_error_free(error);
334 
335  quitMainLoop();
336  break;
337  }
338  case GST_MESSAGE_STATE_CHANGED: {
339  GstState oldState, newState, pendingState;
340 
341  gst_message_parse_state_changed(msg, &oldState, &newState, &pendingState);
342  LOG_DEBUG(
343  "Element %s changed state from %s to %s, pending: %s.", GST_OBJECT_NAME(msg->src),
344  gst_element_state_get_name(oldState), gst_element_state_get_name(newState),
345  gst_element_state_get_name(pendingState));
346  break;
347  }
348  default:
349  break;
350  }
351  return TRUE;
352  }
353 
354 protected:
355  ErrCode setPipelineState(GstState state)
356  {
358  return setState(_pipeline.get(), state);
359  }
360 
361  ErrCode setState(GstElement* ele, GstState state)
362  {
363  DS_ASSERT(ele);
364  GstStateChangeReturn ret = gst_element_set_state(ele, state);
366  ret != GST_STATE_CHANGE_FAILURE, ErrCode::kGst, "element set state: %d failed", state);
367  return ErrCode::kGood;
368  }
369 
370  /* get element states. timeout in milliseconds.
371  */
373  GstElement* ele, GstState* state, GstState* pending = nullptr, size_t timeout = 0)
374  {
375  DS_ASSERT(ele);
376  GstStateChangeReturn ret = gst_element_get_state(
377  ele, state, pending, (timeout ? timeout * 1000000 : GST_CLOCK_TIME_NONE));
378  switch (ret) {
379  case GST_STATE_CHANGE_FAILURE:
380  return ErrCode::kGst;
381  case GST_STATE_CHANGE_SUCCESS:
382  case GST_STATE_CHANGE_NO_PREROLL:
383  return ErrCode::kGood;
384  default:
385  return ErrCode::kUnknown;
386  }
387  return ErrCode::kGood;
388  }
389 
390  static gboolean sBusCall(GstBus* bus, GstMessage* msg, gpointer data)
391  {
392  PipelineContext* ctx = static_cast<PipelineContext*>(data);
393  DS_ASSERT(ctx->bus() == bus);
394  return ctx->busCall(msg);
395  }
396 
397  std::mutex& mutex() const { return _pipelineMutex; }
398 
399  // members
402  uint32_t _busWatchId = 0;
403  std::vector<gst::ElePtr> _elementList;
404  ds3d::UniqPtr<GMainLoop> _mainLoop{nullptr, g_main_loop_unref};
405  bool _eosAutoQuit = false;
406  std::unique_ptr<std::thread> _mainLoopThread;
407  std::atomic_bool _mainStopped{false};
408  std::atomic_bool _eosReceived{false};
409  mutable std::mutex _pipelineMutex;
410  std::condition_variable _StatusCond;
412 };
413 
414 }} // namespace ds3d::gst
415 
416 #endif // NVDS3D_GST_NVDS3D_PIPELINE_CONTEXT_H
ds3d::isGood
bool isGood(ErrCode c)
Definition: func_utils.h:28
yaml_config.hpp
ds3d::gst::PipelineContext::stopPipeline
ErrCode stopPipeline()
Definition: nvds3d_pipeline_context.h:203
ds3d::gst::PipelineContext::setMainloop
void setMainloop(GMainLoop *loop)
Definition: nvds3d_pipeline_context.h:67
ds3d::gst::PipelineContext::setEosAutoQuit
void setEosAutoQuit(bool enable)
Definition: nvds3d_pipeline_context.h:68
ds3d::gst::PipelineContext::playPipeline
ErrCode playPipeline()
Definition: nvds3d_pipeline_context.h:192
ds3d::gst::PipelineContext::waitLoopQuit
void waitLoopQuit()
Definition: nvds3d_pipeline_context.h:174
nvds3d_gst_plugin.h
ds3d::gst::PipelineContext::setState
ErrCode setState(GstElement *ele, GstState state)
Definition: nvds3d_pipeline_context.h:361
ds3d::UniqPtr
std::unique_ptr< T, std::function< void(T *)> > UniqPtr
Definition: obj.hpp:31
DS_ASSERT
#define DS_ASSERT(...)
Definition: defines.h:31
ds3d::gst::PipelineContext::_StatusCond
std::condition_variable _StatusCond
Definition: nvds3d_pipeline_context.h:410
LOG_DEBUG
#define LOG_DEBUG
Definition: logging.h:19
ds3d::gst::PipelineContext::stop
virtual ErrCode stop()
Definition: nvds3d_pipeline_context.h:101
LOG_WARNING
#define LOG_WARNING
Definition: logging.h:17
ds3d::ErrCode::kGst
@ kGst
ds3d::gst::PipelineContext::_mainLoop
ds3d::UniqPtr< GMainLoop > _mainLoop
Definition: nvds3d_pipeline_context.h:404
ds3d::gst::PipelineContext::mainLoop
GMainLoop * mainLoop() const
Definition: nvds3d_pipeline_context.h:306
ds3d::gst::GstPtr
Definition: nvds3d_gst_ptr.h:54
ds3d::gst::SendEosOnSrc
static gboolean SendEosOnSrc(GstElement *element, GstPad *pad, gpointer user_data)
Definition: nvds3d_pipeline_context.h:40
ds3d::gst::kDs3dMixerPluginName
constexpr const char * kDs3dMixerPluginName
Definition: nvds3d_pipeline_context.h:35
ds3d::gst::PipelineContext::_pipeline
gst::ElePtr _pipeline
Definition: nvds3d_pipeline_context.h:400
ds3d::gst::PipelineContext::DS3D_DISABLE_CLASS_COPY
DS3D_DISABLE_CLASS_COPY(PipelineContext)
ds3d::ErrCode::kGood
@ kGood
ds3d::gst::PipelineContext::sBusCall
static gboolean sBusCall(GstBus *bus, GstMessage *msg, gpointer data)
Definition: nvds3d_pipeline_context.h:390
ds3d::gst::PipelineContext::mutex
std::mutex & mutex() const
Definition: nvds3d_pipeline_context.h:397
dataloader.hpp
ds3d::gst::ElePtr
Definition: nvds3d_gst_ptr.h:150
ds3d::gst::PipelineContext::~PipelineContext
virtual ~PipelineContext()
Definition: nvds3d_pipeline_context.h:65
ds3d::gst::kDs3dBridgePluginName
constexpr const char * kDs3dBridgePluginName
Definition: nvds3d_pipeline_context.h:34
ds3d::gst::PipelineContext::isRunning
bool isRunning(size_t timeout=0)
Definition: nvds3d_pipeline_context.h:134
ds3d::gst::PipelineContext::runMainLoop
ErrCode runMainLoop(std::function< void()> loopQuitCb)
Definition: nvds3d_pipeline_context.h:232
ds3d::gst::PipelineContext::start
virtual ErrCode start(std::function< void()> loopQuitCb)
Definition: nvds3d_pipeline_context.h:92
ds3d::gst::PipelineContext::_mainStopped
std::atomic_bool _mainStopped
Definition: nvds3d_pipeline_context.h:407
ds3d::ErrCode
ErrCode
Definition: common.h:43
nvds3d_gst_ptr.h
ds3d::ErrCode::kTimeOut
@ kTimeOut
datamap.hpp
ds3d::gst::GstPtr::setName
void setName(const std::string &name)
Definition: nvds3d_gst_ptr.h:68
ds3d::gst::PipelineContext::init
virtual ErrCode init(const std::string &name)
Definition: nvds3d_pipeline_context.h:70
ds3d::gst::kDs3dFilterPluginName
constexpr const char * kDs3dFilterPluginName
Definition: nvds3d_pipeline_context.h:33
frame.hpp
ds3d::gst::PipelineContext::_busWatchId
uint32_t _busWatchId
Definition: nvds3d_pipeline_context.h:402
ds3d::gst::PipelineContext::bus
GstBus * bus() const
Definition: nvds3d_pipeline_context.h:305
ds3d::gst::PipelineContext::add
PipelineContext & add(const gst::ElePtr &ele)
Definition: nvds3d_pipeline_context.h:83
gstnvdsmeta.h
ds3d::gst::GstPtr::copy
GstObjT * copy() const
Definition: nvds3d_gst_ptr.h:98
ds3d::gst::PipelineContext::setPipelineState
ErrCode setPipelineState(GstState state)
Definition: nvds3d_pipeline_context.h:355
ds3d::gst::GstPtr::get
GstObjT * get() const
Definition: nvds3d_gst_ptr.h:110
ds3d::gst::PipelineContext::pipeline
GstPipeline * pipeline() const
Definition: nvds3d_pipeline_context.h:304
ds3d::gst::PipelineContext::getState
ErrCode getState(GstElement *ele, GstState *state, GstState *pending=nullptr, size_t timeout=0)
Definition: nvds3d_pipeline_context.h:372
LOG_ERROR
#define LOG_ERROR
Definition: logging.h:16
DS3D_ERROR_RETURN
#define DS3D_ERROR_RETURN(code, fmt,...)
Definition: defines.h:72
ds3d::gst::PipelineContext
Definition: nvds3d_pipeline_context.h:62
nvds3d_meta.h
LOG_INFO
#define LOG_INFO
Definition: logging.h:18
DS3D_THROW_ERROR
#define DS3D_THROW_ERROR(statement, code, msg)
Definition: defines.h:78
ds3d::gst::PipelineContext::_eosReceived
std::atomic_bool _eosReceived
Definition: nvds3d_pipeline_context.h:408
ds3d::gst::PipelineContext::_mainLoopThread
std::unique_ptr< std::thread > _mainLoopThread
Definition: nvds3d_pipeline_context.h:406
ds3d::gst::PipelineContext::GSourceCb
static gboolean GSourceCb(gpointer user_data)
Definition: nvds3d_pipeline_context.h:225
ds3d::ErrCode::kUnknown
@ kUnknown
ds3d::gst::PipelineContext::PipelineContext
PipelineContext()
Definition: nvds3d_pipeline_context.h:64
ds3d::gst::PipelineContext::sendEOS
ErrCode sendEOS()
Definition: nvds3d_pipeline_context.h:283
DS3D_FAILED_RETURN
#define DS3D_FAILED_RETURN(condition, ret, fmt,...)
Definition: defines.h:64
ds3d::gst::PipelineContext::_bus
gst::BusPtr _bus
Definition: nvds3d_pipeline_context.h:401
ds3d::gst::PipelineContext::_pipelineMutex
std::mutex _pipelineMutex
Definition: nvds3d_pipeline_context.h:409
ds3d::gst::PipelineContext::deinit
virtual void deinit()
Definition: nvds3d_pipeline_context.h:120
ds3d
Definition: lidar_3d_datatype.h:35
ds3d::gst::GstPtr::reset
void reset(GstObjT *obj=nullptr, bool takeOwner=true)
Definition: nvds3d_gst_ptr.h:89
ds3d::gst::PipelineContext::_eosAutoQuit
bool _eosAutoQuit
Definition: nvds3d_pipeline_context.h:405
ds3d::gst::PipelineContext::_elementList
std::vector< gst::ElePtr > _elementList
Definition: nvds3d_pipeline_context.h:403
ds3d::gst::PipelineContext::quitMainLoop
void quitMainLoop()
Definition: nvds3d_pipeline_context.h:165