Program Listing for File control.hpp#

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "morpheus/export.h"                  // for MORPHEUS_EXPORT
#include "morpheus/messages/meta.hpp"         // for MessageMeta
#include "morpheus/types.hpp"
#include "morpheus/utilities/json_types.hpp"  // for json_t

#include <pybind11/pytypes.h>  // for object, dict, list
#include <pybind11/stl.h>      // IWYU pragma: keep

// for system_clock, time_point
#include <chrono>    // IWYU pragma: keep
#include <map>       // for map
#include <memory>    // for shared_ptr
#include <optional>  // for optional
#include <string>    // for string
#include <vector>    // for vector

// IWYU pragma: no_include <bits/chrono.h>

namespace morpheus {

enum class MORPHEUS_EXPORT ControlMessageType
{
    NONE,
    INFERENCE,
    TRAINING
};

class MORPHEUS_EXPORT TensorMemory;

// System-clock for better compatibility with pybind11/chrono
using time_point_t = std::chrono::time_point<std::chrono::system_clock>;

class MORPHEUS_EXPORT ControlMessage
{
  public:
    ControlMessage();
    explicit ControlMessage(const morpheus::utilities::json_t& config);

    ControlMessage(const ControlMessage& other);  // Copies config and metadata, but not payload

    void config(const morpheus::utilities::json_t& config);

    [[nodiscard]] const morpheus::utilities::json_t& config() const;

    void add_task(const std::string& task_type, const morpheus::utilities::json_t& task);

    [[nodiscard]] bool has_task(const std::string& task_type) const;

    morpheus::utilities::json_t remove_task(const std::string& task_type);

    [[nodiscard]] const morpheus::utilities::json_t& get_tasks() const;

    void set_metadata(const std::string& key, const morpheus::utilities::json_t& value);

    [[nodiscard]] bool has_metadata(const std::string& key) const;

    [[nodiscard]] morpheus::utilities::json_t get_metadata() const;

    [[nodiscard]] morpheus::utilities::json_t get_metadata(const std::string& key, bool fail_on_nonexist = false) const;

    [[nodiscard]] std::vector<std::string> list_metadata() const;

    std::shared_ptr<MessageMeta> payload();

    void payload(const std::shared_ptr<MessageMeta>& payload);

    std::shared_ptr<TensorMemory> tensors();

    void tensors(const std::shared_ptr<TensorMemory>& tensor_memory);

    TensorIndex tensor_count();

    ControlMessageType task_type();

    void task_type(ControlMessageType task_type);

    void set_timestamp(const std::string& key, time_point_t timestamp_ns);

    std::optional<time_point_t> get_timestamp(const std::string& key, bool fail_if_nonexist = false);

    const std::map<std::string, time_point_t>& get_timestamps() const;

    std::map<std::string, time_point_t> filter_timestamp(const std::string& regex_filter);

  private:
    static const std::string s_config_schema;                          // NOLINT
    static std::map<std::string, ControlMessageType> s_task_type_map;  // NOLINT

    ControlMessageType to_task_type(const std::string& task_type, bool throw_on_error) const;

    ControlMessageType m_cm_type{ControlMessageType::NONE};
    std::shared_ptr<MessageMeta> m_payload{nullptr};
    std::shared_ptr<TensorMemory> m_tensors{nullptr};
    TensorIndex m_tensor_count{0};

    morpheus::utilities::json_t m_tasks{};
    morpheus::utilities::json_t m_config{};

    std::map<std::string, time_point_t> m_timestamps{};
};

struct MORPHEUS_EXPORT ControlMessageProxy
{
    static std::shared_ptr<ControlMessage> create(pybind11::object& config_or_message);

    static std::shared_ptr<ControlMessage> create(std::shared_ptr<ControlMessage> other);

    static std::shared_ptr<ControlMessage> copy(ControlMessage& self);

    static pybind11::object get_metadata(ControlMessage& self,
                                         const pybind11::object& key,
                                         pybind11::object default_value);

    static pybind11::list list_metadata(ControlMessage& self);

    static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta);

    static void set_timestamp(ControlMessage& self, const std::string& key, pybind11::object timestamp);

    static pybind11::object get_timestamp(ControlMessage& self, const std::string& key, bool fail_if_nonexist = false);

    static pybind11::dict get_timestamps(ControlMessage& self);

    static pybind11::dict filter_timestamp(ControlMessage& self, const std::string& regex_filter);
};

}  // namespace morpheus