What can I help you with?
NVIDIA Morpheus (25.02.01)

Program Listing for File http_server_source_stage.hpp

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "morpheus/export.h" // for exporting symbols #include "morpheus/messages/control.hpp" // for ControlMessage #include "morpheus/messages/meta.hpp" // for MessageMeta #include "morpheus/utilities/http_server.hpp" // for HttpServer #include "morpheus/utilities/json_types.hpp" // for control_message_task_t & json_t #include <boost/beast/http/status.hpp> // for int_to_status, status #include <boost/fiber/buffered_channel.hpp> // for buffered_channel #include <boost/fiber/channel_op_status.hpp> // for channel_op_status #include <boost/fiber/operations.hpp> // for sleep_for #include <cudf/io/json.hpp> // for json_reader_options & read_json #include <cudf/io/types.hpp> // for table_with_metadata #include <glog/logging.h> // for CHECK & LOG #include <mrc/segment/builder.hpp> // for segment::Builder #include <mrc/segment/object.hpp> // for segment::Object #include <nlohmann/json.hpp> // for json #include <pybind11/pytypes.h> // for pybind11::object #include <pymrc/node.hpp> // for PythonSource #include <rxcpp/rx.hpp> // for subscriber #include <atomic> // for atomic #include <chrono> // for duration #include <cstddef> // for size_t #include <cstdint> // for int64_t #include <exception> // for std::exception #include <memory> // for shared_ptr & unique_ptr #include <sstream> // needed by GLOG #include <stdexcept> // for std::runtime_error #include <string> // for string & to_string #include <tuple> // for make_tuple #include <utility> // for std::move & pair #include <vector> // for vector // IWYU thinks we're using thread::operator<< // IWYU pragma: no_include <thread> // IWYU thinks we need the http.hpp header for int_to_status, but it's defined in status.hpp // IWYU pragma: no_include <boost/beast/http.hpp> namespace morpheus { using table_with_http_fields_t = std::pair<cudf::io::table_with_metadata, morpheus::utilities::json_t>; using table_t = std::unique_ptr<table_with_http_fields_t>; using request_queue_t = boost::fibers::buffered_channel<table_t>; class SourceStageStopAfter : public std::exception {}; // Per type overloads for producing the output message void make_output_message(std::shared_ptr<MessageMeta>& incoming_message, control_message_task_t* task, morpheus::utilities::json_t&& http_fields, std::shared_ptr<MessageMeta>& out_message); void make_output_message(std::shared_ptr<MessageMeta>& incoming_message, control_message_task_t* task, morpheus::utilities::json_t&& http_fields, std::shared_ptr<ControlMessage>& out_message); /****** HttpServerSourceStage *************************************/ template <typename OutputT> class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<OutputT>> { public: using base_t = mrc::pymrc::PythonSource<std::shared_ptr<OutputT>>; using typename base_t::source_type_t; using typename base_t::subscriber_fn_t; HttpServerSourceStage(std::string bind_address = "127.0.0.1", unsigned short port = 8080, std::string endpoint = "/message", std::string live_endpoint = "/live", std::string ready_endpoint = "/ready", std::string method = "POST", std::string live_method = "GET", std::string ready_method = "GET", unsigned accept_status = 201, float sleep_time = 0.1f, long queue_timeout = 5, std::size_t max_queue_size = 1024, unsigned short num_server_threads = 1, std::size_t max_payload_size = DefaultMaxPayloadSize, std::chrono::seconds request_timeout = std::chrono::seconds(30), bool lines = false, std::size_t stop_after = 0, std::unique_ptr<control_message_task_t> task = nullptr); ~HttpServerSourceStage() override { close(); } void close(); private: subscriber_fn_t build(); void source_generator(rxcpp::subscriber<source_type_t> subscriber); std::atomic<int> m_queue_cnt = 0; std::chrono::steady_clock::duration m_sleep_time; std::chrono::duration<long> m_queue_timeout; std::unique_ptr<HttpServer> m_server; request_queue_t m_queue; std::size_t m_max_queue_size; std::size_t m_stop_after; std::size_t m_records_emitted{0}; std::unique_ptr<control_message_task_t> m_task{nullptr}; }; /****** HttpServerSourceStageInterfaceProxy***********************/ struct MORPHEUS_EXPORT HttpServerSourceStageInterfaceProxy { static std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<MessageMeta>>> init_meta( mrc::segment::Builder& builder, const std::string& name, std::string bind_address, unsigned short port, std::string endpoint, std::string live_endpoint, std::string ready_endpoint, std::string method, std::string live_method, std::string ready_method, unsigned accept_status, float sleep_time, long queue_timeout, std::size_t max_queue_size, unsigned short num_server_threads, std::size_t max_payload_size, int64_t request_timeout, bool lines, std::size_t stop_after); static std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<ControlMessage>>> init_cm( mrc::segment::Builder& builder, const std::string& name, std::string bind_address, unsigned short port, std::string endpoint, std::string live_endpoint, std::string ready_endpoint, std::string method, std::string live_method, std::string ready_method, unsigned accept_status, float sleep_time, long queue_timeout, std::size_t max_queue_size, unsigned short num_server_threads, std::size_t max_payload_size, int64_t request_timeout, bool lines, std::size_t stop_after, const pybind11::object& task_type, const pybind11::object& task_payload); }; template <typename OutputT> HttpServerSourceStage<OutputT>::HttpServerSourceStage(std::string bind_address, unsigned short port, std::string endpoint, std::string live_endpoint, std::string ready_endpoint, std::string method, std::string live_method, std::string ready_method, unsigned accept_status, float sleep_time, long queue_timeout, std::size_t max_queue_size, unsigned short num_server_threads, std::size_t max_payload_size, std::chrono::seconds request_timeout, bool lines, std::size_t stop_after, std::unique_ptr<control_message_task_t> task) : base_t(build()), m_max_queue_size{max_queue_size}, m_sleep_time{std::chrono::milliseconds(static_cast<long int>(sleep_time))}, m_queue_timeout{queue_timeout}, m_queue{max_queue_size}, m_stop_after{stop_after}, m_task{std::move(task)} { CHECK(boost::beast::http::int_to_status(accept_status) != boost::beast::http::status::unknown) << "Invalid HTTP status code: " << accept_status; request_handler_fn_t parser = [this, accept_status, lines](const tcp_endpoint_t& tcp_endpoint, const request_t& request) { // This function is called from one of the HTTPServer's worker threads, avoid performing any additional work // here beyond what is strictly nessary to return a valid response to the client. We parse the payload here, // that way we can return an appropriate error message if the payload is invalid however we stop avoid // constructing a MessageMeta object since that would require grabbing the Python GIL, instead we push the // libcudf table to the queue and let the subscriber handle the conversion to MessageMeta. table_t table{nullptr}; try { std::string body{request.body()}; cudf::io::source_info source{body.c_str(), body.size()}; auto options = cudf::io::json_reader_options::builder(source).lines(lines); auto cudf_table = cudf::io::read_json(options.build()); auto http_fields = request_headers_to_json(tcp_endpoint, request); table = std::make_unique<table_with_http_fields_t>(std::move(cudf_table), std::move(http_fields)); } catch (const std::exception& e) { // We want to log the exception locally, but we don't want to include the exception message in the response // since that may leak sensitive information std::string error_msg = "Error occurred converting HTTP payload to Dataframe"; LOG(ERROR) << error_msg << ": " << e.what(); return std::make_tuple(400u, "text/plain", error_msg, nullptr); } try { // NOLINTNEXTLINE(clang-diagnostic-unused-value) DCHECK_NOTNULL(table); auto queue_status = m_queue.push_wait_for(std::move(table), m_queue_timeout); if (queue_status == boost::fibers::channel_op_status::success) { ++m_queue_cnt; return std::make_tuple(accept_status, "text/plain", std::string(), nullptr); } std::string error_msg = "HTTP payload queue is "; switch (queue_status) { case boost::fibers::channel_op_status::full: case boost::fibers::channel_op_status::timeout: { error_msg += "full"; break; } case boost::fibers::channel_op_status::closed: { error_msg += "closed"; break; } default: { error_msg += "in an unknown state"; break; } } return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); } catch (const std::exception& e) { // Refer above comment about not including exception messages in the response std::string error_msg = "Error occurred while pushing payload to queue"; LOG(ERROR) << error_msg << ": " << e.what(); return std::make_tuple(500u, "text/plain", error_msg, nullptr); } }; request_handler_fn_t live_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) { if (!m_server->is_running()) { std::string error_msg = "Source server is not running"; return std::make_tuple(500u, "text/plain", error_msg, nullptr); } return std::make_tuple(200u, "text/plain", std::string(), nullptr); }; request_handler_fn_t ready_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) { if (!m_server->is_running()) { std::string error_msg = "Source server is not running"; return std::make_tuple(500u, "text/plain", error_msg, nullptr); } if (m_queue_cnt < m_max_queue_size) { return std::make_tuple(200u, "text/plain", std::string(), nullptr); } std::string error_msg = "HTTP payload queue is full or unavailable to accept new values"; return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr); }; std::vector<HttpEndpoint> endpoints; endpoints.emplace_back(parser, std::move(endpoint), std::move(method)); endpoints.emplace_back(live_parser, std::move(live_endpoint), std::move(live_method)); endpoints.emplace_back(ready_parser, std::move(ready_endpoint), std::move(ready_method)); m_server = std::make_unique<HttpServer>( std::move(endpoints), std::move(bind_address), port, num_server_threads, max_payload_size, request_timeout); } template <typename OutputT> HttpServerSourceStage<OutputT>::subscriber_fn_t HttpServerSourceStage<OutputT>::build() { return [this](rxcpp::subscriber<source_type_t> subscriber) -> void { try { m_server->start(); this->source_generator(subscriber); } catch (const SourceStageStopAfter& e) { DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records"; } catch (const std::exception& e) { LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl; subscriber.on_error(std::make_exception_ptr(e)); return; } subscriber.on_completed(); this->close(); }; } template <typename OutputT> void HttpServerSourceStage<OutputT>::source_generator( rxcpp::subscriber<HttpServerSourceStage::source_type_t> subscriber) { // only check if the server is running when the queue is empty, allowing all queued messages to be processed prior // to shutting down bool server_running = true; bool queue_closed = false; while (subscriber.is_subscribed() && server_running && !queue_closed) { table_t table_ptr{nullptr}; auto queue_status = m_queue.try_pop(table_ptr); if (queue_status == boost::fibers::channel_op_status::success) { --m_queue_cnt; // NOLINTNEXTLINE(clang-diagnostic-unused-value) DCHECK_NOTNULL(table_ptr); try { auto message = MessageMeta::create_from_cpp(std::move(table_ptr->first), 0); auto num_records = message->count(); // When OutputT is MessageMeta, we just swap the pointers std::shared_ptr<OutputT> out_message{nullptr}; make_output_message(message, m_task.get(), std::move(table_ptr->second), out_message); subscriber.on_next(std::move(out_message)); m_records_emitted += num_records; } catch (const std::exception& e) { LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what(); } if (m_stop_after > 0 && m_records_emitted >= m_stop_after) { throw SourceStageStopAfter(); } } else if (queue_status == boost::fibers::channel_op_status::empty) { // if the queue is empty, maybe it's because our server is not running server_running = m_server->is_running(); if (server_running) { // Sleep when there are no messages boost::this_fiber::sleep_for(m_sleep_time); } } else if (queue_status == boost::fibers::channel_op_status::closed) { queue_closed = true; } else { std::string error_msg{"Unknown queue status: " + std::to_string(static_cast<int>(queue_status))}; LOG(ERROR) << error_msg; throw std::runtime_error(error_msg); } } } template <typename OutputT> void HttpServerSourceStage<OutputT>::close() { if (m_server) { m_server->stop(); // this is a no-op if the server is not running m_server.reset(); } m_queue.close(); } // end of group } // namespace morpheus

© Copyright 2024, NVIDIA. Last updated on Mar 3, 2025.