↰ Return to documentation for file (morpheus/_lib/src/objects/fiber_queue.cpp
)
#include "morpheus/objects/fiber_queue.hpp"
#include <boost/fiber/channel_op_status.hpp>
#include <pybind11/gil.h>// for gil_scoped_release
#include <pybind11/pybind11.h>
#include <chrono>
#include <functional>// for ref, reference_wrapper
#include <memory>
#include <ratio>// for ratio needed for std::chrono::duration
#include <stdexcept>// for invalid_argument, runtime_error
#include <utility>
namespace morpheus {
/****** Component public implementations *******************/
/****** FiberQueue****************************************/
FiberQueue::FiberQueue(size_t max_size) : m_queue(max_size) {}
boost::fibers::channel_op_status FiberQueue::put(pybind11::object &&item, bool block, float timeout)
{
if (!block)
{
return m_queue.try_push(std::move(item));
}
else if (timeout > 0.0)
{
return m_queue.push_wait_for(
std::move(item), std::chrono::duration_cast<std::chrono::seconds>(std::chrono::duration<float>(timeout)));
}
else
{
// Blocking no timeout
return m_queue.push(std::move(item));
}
}
boost::fibers::channel_op_status FiberQueue::get(pybind11::object &item, bool block, float timeout)
{
if (!block)
{
return m_queue.try_pop(std::ref(item));
}
else if (timeout > 0.0)
{
return m_queue.pop_wait_for(
std::ref(item), std::chrono::duration_cast<std::chrono::seconds>(std::chrono::duration<float>(timeout)));
}
else
{
// Blocking no timeout
return m_queue.pop(std::ref(item));
}
}
void FiberQueue::close()
{
m_queue.close();
}
bool FiberQueue::is_closed()
{
return m_queue.is_closed();
}
void FiberQueue::join()
{
// TODO(MDD): Not sure how to join a buffered channel
}
/****** FiberQueueInterfaceProxy *************************/
std::shared_ptr<morpheus::FiberQueue> FiberQueueInterfaceProxy::init(std::size_t max_size)
{
if (max_size < 2 || ((max_size & (max_size - 1)) != 0))
{
throw std::invalid_argument("max_size must be greater than 1 and a power of 2.");
}
// Create a new shared_ptr
return std::make_shared<morpheus::FiberQueue>(max_size);
}
void FiberQueueInterfaceProxy::put(morpheus::FiberQueue &self, pybind11::object item, bool block, float timeout)
{
boost::fibers::channel_op_status status;
// Release the GIL and try to move it
{
pybind11::gil_scoped_release nogil;
status = self.put(std::move(item), block, timeout);
}
switch (status)
{
case boost::fibers::channel_op_status::success:
return;
case boost::fibers::channel_op_status::empty: {
// Raise queue.Empty
pybind11::object exc_class = pybind11::module_::import("queue").attr("Empty");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
case boost::fibers::channel_op_status::full:
case boost::fibers::channel_op_status::timeout: {
// Raise queue.Full
pybind11::object exc_class = pybind11::module_::import("queue").attr("Empty");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
case boost::fibers::channel_op_status::closed: {
// Raise queue.Full
pybind11::object exc_class = pybind11::module_::import("morpheus.utils.producer_consumer_queue").attr("Closed");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
}
}
pybind11::object FiberQueueInterfaceProxy::get(morpheus::FiberQueue &self, bool block, float timeout)
{
boost::fibers::channel_op_status status;
pybind11::object item;
// Release the GIL and try to move it
{
pybind11::gil_scoped_release nogil;
status = self.get(std::ref(item), block, timeout);
}
switch (status)
{
case boost::fibers::channel_op_status::success:
return item;
case boost::fibers::channel_op_status::empty: {
// Raise queue.Empty
pybind11::object exc_class = pybind11::module_::import("queue").attr("Empty");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
case boost::fibers::channel_op_status::full:
case boost::fibers::channel_op_status::timeout: {
// Raise queue.Full
pybind11::object exc_class = pybind11::module_::import("queue").attr("Empty");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
case boost::fibers::channel_op_status::closed: {
// Raise queue.Full
pybind11::object exc_class = pybind11::module_::import("morpheus.utils.producer_consumer_queue").attr("Closed");
PyErr_SetNone(exc_class.ptr());
throw pybind11::error_already_set();
}
default:
throw std::runtime_error("Unknown channel status");
}
}
void FiberQueueInterfaceProxy::close(morpheus::FiberQueue &self)
{
self.close();
}
} // namespace morpheus