Program Listing for File control.hpp
↰ Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/messages/control.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/types.hpp"
#include "morpheus/utilities/json_types.hpp" // for json_t
#include <pybind11/pytypes.h> // for object, dict, list
#include <pybind11/stl.h> // IWYU pragma: keep
// for system_clock, time_point
#include <chrono> // IWYU pragma: keep
#include <map> // for map
#include <memory> // for shared_ptr
#include <optional> // for optional
#include <string> // for string
#include <vector> // for vector
// IWYU pragma: no_include <bits/chrono.h>
namespace morpheus {
enum class MORPHEUS_EXPORT ControlMessageType
{
NONE,
INFERENCE,
TRAINING
};
class MORPHEUS_EXPORT TensorMemory;
// System-clock for better compatibility with pybind11/chrono
using time_point_t = std::chrono::time_point<std::chrono::system_clock>;
class MORPHEUS_EXPORT ControlMessage
{
public:
ControlMessage();
explicit ControlMessage(const morpheus::utilities::json_t& config);
ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload
void config(const morpheus::utilities::json_t& config);
[[nodiscard]] const morpheus::utilities::json_t& config() const;
void add_task(const std::string& task_type, const morpheus::utilities::json_t& task);
[[nodiscard]] bool has_task(const std::string& task_type) const;
morpheus::utilities::json_t remove_task(const std::string& task_type);
[[nodiscard]] const morpheus::utilities::json_t& get_tasks() const;
void set_metadata(const std::string& key, const morpheus::utilities::json_t& value);
[[nodiscard]] bool has_metadata(const std::string& key) const;
[[nodiscard]] morpheus::utilities::json_t get_metadata() const;
[[nodiscard]] morpheus::utilities::json_t get_metadata(const std::string& key, bool fail_on_nonexist = false) const;
[[nodiscard]] std::vector<std::string> list_metadata() const;
std::shared_ptr<MessageMeta> payload();
void payload(const std::shared_ptr<MessageMeta>& payload);
std::shared_ptr<TensorMemory> tensors();
void tensors(const std::shared_ptr<TensorMemory>& tensor_memory);
TensorIndex tensor_count();
ControlMessageType task_type();
void task_type(ControlMessageType task_type);
void set_timestamp(const std::string& key, time_point_t timestamp_ns);
std::optional<time_point_t> get_timestamp(const std::string& key, bool fail_if_nonexist = false);
const std::map<std::string, time_point_t>& get_timestamps() const;
std::map<std::string, time_point_t> filter_timestamp(const std::string& regex_filter);
private:
static const std::string s_config_schema; // NOLINT
static std::map<std::string, ControlMessageType> s_task_type_map; // NOLINT
ControlMessageType to_task_type(const std::string& task_type, bool throw_on_error) const;
ControlMessageType m_cm_type{ControlMessageType::NONE};
std::shared_ptr<MessageMeta> m_payload{nullptr};
std::shared_ptr<TensorMemory> m_tensors{nullptr};
TensorIndex m_tensor_count{0};
morpheus::utilities::json_t m_tasks{};
morpheus::utilities::json_t m_config{};
std::map<std::string, time_point_t> m_timestamps{};
};
struct MORPHEUS_EXPORT ControlMessageProxy
{
static std::shared_ptr<ControlMessage> create(pybind11::object& config_or_message);
static std::shared_ptr<ControlMessage> create(std::shared_ptr<ControlMessage> other);
static std::shared_ptr<ControlMessage> copy(ControlMessage& self);
static pybind11::object get_metadata(ControlMessage& self,
const pybind11::object& key,
pybind11::object default_value);
static pybind11::list list_metadata(ControlMessage& self);
static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta);
static void set_timestamp(ControlMessage& self, const std::string& key, pybind11::object timestamp);
static pybind11::object get_timestamp(ControlMessage& self, const std::string& key, bool fail_if_nonexist = false);
static pybind11::dict get_timestamps(ControlMessage& self);
static pybind11::dict filter_timestamp(ControlMessage& self, const std::string& regex_filter);
};
} // namespace morpheus