Program Listing for File http_server.hpp#

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/utilities/http_server.hpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "morpheus/export.h"                  // for exporting symbols
#include "morpheus/utilities/json_types.hpp"  // for json_t

#include <boost/asio/io_context.hpp>         // for io_context
#include <boost/asio/ip/tcp.hpp>             // for tcp, tcp::acceptor, tcp::endpoint, tcp::socket
#include <boost/beast/core/error.hpp>        // for error_code
#include <boost/beast/http/message.hpp>      // for request
#include <boost/beast/http/string_body.hpp>  // for string_body
#include <boost/beast/http/verb.hpp>         // for verb
#include <boost/system/detail/error_code.hpp>
#include <pybind11/pytypes.h>  // for pybind11::function

#include <atomic>      // for atomic
#include <chrono>      // for seconds
#include <cstddef>     // for size_t
#include <cstdint>     // for int64_t
#include <functional>  // for function
#include <memory>      // for shared_ptr & unique_ptr
#include <semaphore>   // for semaphore
#include <string>      // for string
#include <thread>      // for thread
#include <tuple>       // for make_tuple, tuple
#include <vector>      // for vector

namespace morpheus {
using on_complete_cb_fn_t = std::function<void(const boost::system::error_code& /* error message */)>;

using parse_status_t = std::tuple<unsigned /*http status code*/,
                                  std::string /* Content-Type of response */,
                                  std::string /* response body */,
                                  on_complete_cb_fn_t /* optional callback function, ignored if null */>;

// Note this is different than the http endpoint this represents the TCP connection
using tcp_endpoint_t = boost::asio::ip::tcp::endpoint;
using request_t      = boost::beast::http::request<boost::beast::http::string_body>;

using request_handler_fn_t =
    std::function<parse_status_t(const tcp_endpoint_t& tcp_endpoint, const request_t& request)>;

using payload_parse_fn_t = std::function<parse_status_t(const std::string& body)>;

constexpr std::size_t DefaultMaxPayloadSize{1024 * 1024 * 10};  // 10MB

utilities::json_t request_headers_to_json(const tcp_endpoint_t& tcp_endpoint, const request_t& request);

struct MORPHEUS_EXPORT HttpEndpoint
{
    HttpEndpoint(request_handler_fn_t request_handler_fn, std::string&& url, const std::string& method);
    HttpEndpoint(payload_parse_fn_t payload_parse_fn, std::string&& url, const std::string& method);

    std::shared_ptr<request_handler_fn_t> m_request_handler;
    std::shared_ptr<payload_parse_fn_t> m_parser;
    std::string m_url;
    boost::beast::http::verb m_method;

  private:
    HttpEndpoint(std::shared_ptr<request_handler_fn_t>&& request_handler_fn,
                 std::shared_ptr<payload_parse_fn_t>&& payload_parse_fn,
                 std::string&& url,
                 const std::string& method);
};

class MORPHEUS_EXPORT Listener : public std::enable_shared_from_this<Listener>
{
  public:
    Listener(boost::asio::io_context& io_context,
             const std::string& bind_address,
             unsigned short port,
             std::vector<HttpEndpoint> endpoints,
             std::size_t max_payload_size,
             std::chrono::seconds request_timeout);

    ~Listener() = default;

    void run();
    void stop();
    bool is_running() const;

  private:
    void do_accept();
    void on_accept(boost::beast::error_code ec, boost::asio::ip::tcp::socket socket);

    boost::asio::io_context& m_io_context;
    boost::asio::ip::tcp::endpoint m_tcp_endpoint;
    std::unique_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;

    std::vector<HttpEndpoint> m_endpoints;
    std::size_t m_max_payload_size;
    std::chrono::seconds m_request_timeout;
    std::atomic<bool> m_is_running;
};

class MORPHEUS_EXPORT HttpServer
{
  public:
    HttpServer(std::vector<HttpEndpoint> endpoints,
               std::string bind_address             = "127.0.0.1",
               unsigned short port                  = 8080,
               unsigned short num_threads           = 1,
               std::size_t max_payload_size         = DefaultMaxPayloadSize,
               std::chrono::seconds request_timeout = std::chrono::seconds(30));
    ~HttpServer();
    void start();
    void stop();
    bool is_running() const;

  private:
    void start_listener(std::binary_semaphore& listener_semaphore, std::binary_semaphore& started_semaphore);

    std::string m_bind_address;
    unsigned short m_port;
    std::vector<HttpEndpoint> m_endpoints;
    unsigned short m_num_threads;
    std::chrono::seconds m_request_timeout;
    std::size_t m_max_payload_size;
    std::vector<std::thread> m_listener_threads;
    boost::asio::io_context m_io_context;
    std::shared_ptr<Listener> m_listener;
    std::atomic<bool> m_is_running;
};

/****** HttpEndpointInterfaceProxy ************************/
struct MORPHEUS_EXPORT HttpEndpointInterfaceProxy
{
    static std::shared_ptr<HttpEndpoint> init(pybind11::function py_parse_fn,
                                              std::string m_url,
                                              std::string m_method,
                                              bool include_headers = false);
};

/****** HttpServerInterfaceProxy *************************/
struct MORPHEUS_EXPORT HttpServerInterfaceProxy
{
    static std::shared_ptr<HttpServer> init(std::vector<HttpEndpoint> endpoints,
                                            std::string bind_address,
                                            unsigned short port,
                                            unsigned short num_threads,
                                            std::size_t max_payload_size,
                                            int64_t request_timeout);
    static void start(HttpServer& self);
    static void stop(HttpServer& self);
    static bool is_running(const HttpServer& self);

    // Context manager methods
    static HttpServer& enter(HttpServer& self);
    static void exit(HttpServer& self,
                     const pybind11::object& type,
                     const pybind11::object& value,
                     const pybind11::object& traceback);
};
}  // namespace morpheus