Program Listing for File control.hpp
↰ Return to documentation for file (morpheus/_lib/include/morpheus/messages/control.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/utilities/json_types.hpp" // for json_t
#include <pybind11/pytypes.h> // for object, dict, list
#include <chrono> // for system_clock, time_point
#include <map> // for map
#include <memory> // for shared_ptr
#include <optional> // for optional
#include <string> // for string
#include <vector> // for vector
namespace morpheus {
enum class MORPHEUS_EXPORT ControlMessageType
{
NONE,
INFERENCE,
TRAINING
};
// class PayloadManager
// {
// public:
// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// TensorObject& get_tensor(const std::string& name)
// {
// return m_tensors->get_tensor(name);
// }
// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return const TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// const TensorObject& get_tensor(const std::string& name) const
// {
// return m_tensors->get_tensor(name);
// }
// /**
// * @brief Set the tensor object identified by `name`
// *
// * @param name
// * @param tensor
// * @throws std::length_error If the number of rows in `tensor` does not match `count`.
// */
// void set_tensor(const std::string& name, TensorObject&& tensor)
// {
// m_tensors->set_tensor(name, std::move(tensor));
// }
// /**
// * @brief Get a reference to the internal tensors map
// *
// * @return const TensorMap&
// */
// const TensorMap& get_tensors() const
// {
// return m_tensors->get_tensors();
// }
// /**
// * @brief Set the tensors object
// *
// * @param tensors
// * @throws std::length_error If the number of rows in the `tensors` do not match `count`.
// */
// void set_tensors(TensorMap&& tensors)
// {
// m_tensors->set_tensors(std::move(tensors));
// }
// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// TensorObject& get_column(const std::string& name)
// {
// return m_tensors->get_tensor(name);
// }
// /**
// * @brief Get the tensor object identified by `name`
// *
// * @param name
// * @return const TensorObject&
// * @throws std::runtime_error If no tensor matching `name` exists
// */
// const TensorObject& get_column(const std::string& name) const
// {
// return m_tensors->get_tensor(name);
// }
// /**
// * @brief Set the tensor object identified by `name`
// *
// * @param name
// * @param tensor
// * @throws std::length_error If the number of rows in `tensor` does not match `count`.
// */
// void set_column(const std::string& name, TensorObject&& tensor)
// {
// m_tensors->set_tensor(name, std::move(tensor));
// }
// /**
// * @brief Get a reference to the internal tensors map
// *
// * @return const TensorMap&
// */
// TableInfo get_columns() const
// {
// return m_df->get_info();
// }
// /**
// * @brief Set the tensors object
// *
// * @param tensors
// * @throws std::length_error If the number of rows in the `tensors` do not match `count`.
// */
// void set_columns(TableInfo&& tensors)
// {
// m_tensors->set_tensors(std::move(tensors));
// }
// private:
// std::shared_ptr<MessageMeta> m_df;
// std::shared_ptr<TensorMemory> m_tensors;
// };
class MORPHEUS_EXPORT TensorMemory;
// System-clock for better compatibility with pybind11/chrono
using time_point_t = std::chrono::time_point<std::chrono::system_clock>;
class MORPHEUS_EXPORT ControlMessage
{
public:
ControlMessage();
explicit ControlMessage(const morpheus::utilities::json_t& config);
ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload
void config(const morpheus::utilities::json_t& config);
[[nodiscard]] const morpheus::utilities::json_t& config() const;
void add_task(const std::string& task_type, const morpheus::utilities::json_t& task);
[[nodiscard]] bool has_task(const std::string& task_type) const;
morpheus::utilities::json_t remove_task(const std::string& task_type);
[[nodiscard]] const morpheus::utilities::json_t& get_tasks() const;
void set_metadata(const std::string& key, const morpheus::utilities::json_t& value);
[[nodiscard]] bool has_metadata(const std::string& key) const;
[[nodiscard]] morpheus::utilities::json_t get_metadata() const;
[[nodiscard]] morpheus::utilities::json_t get_metadata(const std::string& key, bool fail_on_nonexist = false) const;
[[nodiscard]] std::vector<std::string> list_metadata() const;
std::shared_ptr<MessageMeta> payload();
void payload(const std::shared_ptr<MessageMeta>& payload);
std::shared_ptr<TensorMemory> tensors();
void tensors(const std::shared_ptr<TensorMemory>& tensor_memory);
ControlMessageType task_type();
void task_type(ControlMessageType task_type);
void set_timestamp(const std::string& key, time_point_t timestamp_ns);
std::optional<time_point_t> get_timestamp(const std::string& key, bool fail_if_nonexist = false);
std::map<std::string, time_point_t> filter_timestamp(const std::string& regex_filter);
private:
static const std::string s_config_schema; // NOLINT
static std::map<std::string, ControlMessageType> s_task_type_map; // NOLINT
ControlMessageType m_cm_type{ControlMessageType::NONE};
std::shared_ptr<MessageMeta> m_payload{nullptr};
std::shared_ptr<TensorMemory> m_tensors{nullptr};
morpheus::utilities::json_t m_tasks{};
morpheus::utilities::json_t m_config{};
std::map<std::string, time_point_t> m_timestamps{};
};
struct MORPHEUS_EXPORT ControlMessageProxy
{
static std::shared_ptr<ControlMessage> create(pybind11::dict& config);
static std::shared_ptr<ControlMessage> create(std::shared_ptr<ControlMessage> other);
static std::shared_ptr<ControlMessage> copy(ControlMessage& self);
static pybind11::object get_metadata(ControlMessage& self,
const pybind11::object& key,
pybind11::object default_value);
static pybind11::list list_metadata(ControlMessage& self);
static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta);
static void set_timestamp(ControlMessage& self, const std::string& key, pybind11::object timestamp);
static pybind11::object get_timestamp(ControlMessage& self, const std::string& key, bool fail_if_nonexist = false);
static pybind11::dict filter_timestamp(ControlMessage& self, const std::string& regex_filter);
};
} // namespace morpheus