Program Listing for File http_server.hpp
↰ Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "morpheus/export.h" // for exporting symbols
#include "morpheus/utilities/json_types.hpp" // for json_t
#include <boost/asio/io_context.hpp> // for io_context
#include <boost/asio/ip/tcp.hpp> // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket
#include <boost/beast/core/error.hpp> // for error_code
#include <boost/beast/http/message.hpp> // for request
#include <boost/beast/http/string_body.hpp> // for string_body
#include <boost/beast/http/verb.hpp> // for verb
#include <boost/system/detail/error_code.hpp>
#include <pybind11/pytypes.h> // for pybind11::function
#include <atomic> // for atomic
#include <chrono> // for seconds
#include <cstddef> // for size_t
#include <cstdint> // for int64_t
#include <functional> // for function
#include <memory> // for shared_ptr & unique_ptr
#include <semaphore> // for semaphore
#include <string> // for string
#include <thread> // for thread
#include <tuple> // for make_tuple, tuple
#include <vector> // for vector
namespace morpheus {
using on_complete_cb_fn_t = std::function<void(const boost::system::error_code& /* error message */)>;
using parse_status_t = std::tuple<unsigned /*http status code*/,
std::string /* Content-Type of response */,
std::string /* response body */,
on_complete_cb_fn_t /* optional callback function, ignored if null */>;
// Note this is different than the http endpoint this represents the TCP connection
using tcp_endpoint_t = boost::asio::ip::tcp::endpoint;
using request_t = boost::beast::http::request<boost::beast::http::string_body>;
using request_handler_fn_t =
std::function<parse_status_t(const tcp_endpoint_t& tcp_endpoint, const request_t& request)>;
using payload_parse_fn_t = std::function<parse_status_t(const std::string& body)>;
constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10}; // 10MB
utilities::json_t request_headers_to_json(const tcp_endpoint_t& tcp_endpoint, const request_t& request);
struct MORPHEUS_EXPORT HttpEndpoint
{
HttpEndpoint(request_handler_fn_t request_handler_fn, std::string&& url, const std::string& method);
HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string&& url, const std::string& method);
std::shared_ptr<request_handler_fn_t> m_request_handler;
std::shared_ptr<payload_parse_fn_t> m_parser;
std::string m_url;
boost::beast::http::verb m_method;
private:
HttpEndpoint(std::shared_ptr<request_handler_fn_t>&& request_handler_fn,
std::shared_ptr<payload_parse_fn_t>&& payload_parse_fn,
std::string&& url,
const std::string& method);
};
class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
{
public:
Listener(boost::asio::io_context& io_context,
const std::string& bind_address,
unsigned short port,
std::vector<HttpEndpoint> endpoints,
std::size_t max_payload_size,
std::chrono::seconds request_timeout);
~Listener() = default;
void run();
void stop();
bool is_running() const;
private:
void do_accept();
void on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket);
boost::asio::io_context& m_io_context;
boost::asio::ip::tcp::endpoint m_tcp_endpoint;
std::unique_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
std::vector<HttpEndpoint> m_endpoints;
std::size_t m_max_payload_size;
std::chrono::seconds m_request_timeout;
std::atomic<bool> m_is_running;
};
class MORPHEUS_EXPORT HttpServer
{
public:
HttpServer(std::vector<HttpEndpoint> endpoints,
std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
unsigned short num_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30));
~HttpServer();
void start();
void stop();
bool is_running() const;
private:
void start_listener(std::binary_semaphore& listener_semaphore, std::binary_semaphore& started_semaphore);
std::string m_bind_address;
unsigned short m_port;
std::vector<HttpEndpoint> m_endpoints;
unsigned short m_num_threads;
std::chrono::seconds m_request_timeout;
std::size_t m_max_payload_size;
std::vector<std::thread> m_listener_threads;
boost::asio::io_context m_io_context;
std::shared_ptr<Listener> m_listener;
std::atomic<bool> m_is_running;
};
/****** HttpEndpointInterfaceProxy ************************/
struct MORPHEUS_EXPORT HttpEndpointInterfaceProxy
{
static std::shared_ptr<HttpEndpoint> init(pybind11::function py_parse_fn,
std::string m_url,
std::string m_method,
bool include_headers = false);
};
/****** HttpServerInterfaceProxy *************************/
struct MORPHEUS_EXPORT HttpServerInterfaceProxy
{
static std::shared_ptr<HttpServer> init(std::vector<HttpEndpoint> endpoints,
std::string bind_address,
unsigned short port,
unsigned short num_threads,
std::size_t max_payload_size,
int64_t request_timeout);
static void start(HttpServer& self);
static void stop(HttpServer& self);
static bool is_running(const HttpServer& self);
// Context manager methods
static HttpServer& enter(HttpServer& self);
static void exit(HttpServer& self,
const pybind11::object& type,
const pybind11::object& value,
const pybind11::object& traceback);
};
} // namespace morpheus