Program Listing for File control.hpp

Return to documentation for file (morpheus/_lib/include/morpheus/messages/control.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "morpheus/messages/meta.hpp" // for MessageMeta #include <nlohmann/json.hpp> // for json, basic_json #include <pybind11/pytypes.h> // for object, dict, list, none #include <chrono> // for system_clock, time_point #include <map> // for map #include <memory> // for shared_ptr #include <optional> // for optional #include <string> // for string #include <vector> // for vector namespace morpheus { #pragma GCC visibility push(default) enum class ControlMessageType { NONE, INFERENCE, TRAINING }; // class PayloadManager // { // public: // /** // * @brief Get the tensor object identified by `name` // * // * @param name // * @return TensorObject& // * @throws std::runtime_error If no tensor matching `name` exists // */ // TensorObject& get_tensor(const std::string& name) // { // return m_tensors->get_tensor(name); // } // /** // * @brief Get the tensor object identified by `name` // * // * @param name // * @return const TensorObject& // * @throws std::runtime_error If no tensor matching `name` exists // */ // const TensorObject& get_tensor(const std::string& name) const // { // return m_tensors->get_tensor(name); // } // /** // * @brief Set the tensor object identified by `name` // * // * @param name // * @param tensor // * @throws std::length_error If the number of rows in `tensor` does not match `count`. // */ // void set_tensor(const std::string& name, TensorObject&& tensor) // { // m_tensors->set_tensor(name, std::move(tensor)); // } // /** // * @brief Get a reference to the internal tensors map // * // * @return const TensorMap& // */ // const TensorMap& get_tensors() const // { // return m_tensors->get_tensors(); // } // /** // * @brief Set the tensors object // * // * @param tensors // * @throws std::length_error If the number of rows in the `tensors` do not match `count`. // */ // void set_tensors(TensorMap&& tensors) // { // m_tensors->set_tensors(std::move(tensors)); // } // /** // * @brief Get the tensor object identified by `name` // * // * @param name // * @return TensorObject& // * @throws std::runtime_error If no tensor matching `name` exists // */ // TensorObject& get_column(const std::string& name) // { // return m_tensors->get_tensor(name); // } // /** // * @brief Get the tensor object identified by `name` // * // * @param name // * @return const TensorObject& // * @throws std::runtime_error If no tensor matching `name` exists // */ // const TensorObject& get_column(const std::string& name) const // { // return m_tensors->get_tensor(name); // } // /** // * @brief Set the tensor object identified by `name` // * // * @param name // * @param tensor // * @throws std::length_error If the number of rows in `tensor` does not match `count`. // */ // void set_column(const std::string& name, TensorObject&& tensor) // { // m_tensors->set_tensor(name, std::move(tensor)); // } // /** // * @brief Get a reference to the internal tensors map // * // * @return const TensorMap& // */ // TableInfo get_columns() const // { // return m_df->get_info(); // } // /** // * @brief Set the tensors object // * // * @param tensors // * @throws std::length_error If the number of rows in the `tensors` do not match `count`. // */ // void set_columns(TableInfo&& tensors) // { // m_tensors->set_tensors(std::move(tensors)); // } // private: // std::shared_ptr<MessageMeta> m_df; // std::shared_ptr<TensorMemory> m_tensors; // }; class TensorMemory; // System-clock for better compatibility with pybind11/chrono using time_point_t = std::chrono::time_point<std::chrono::system_clock>; class ControlMessage { public: ControlMessage(); explicit ControlMessage(const nlohmann::json& config); ControlMessage(const ControlMessage& other); // Copies config and metadata, but not payload void config(const nlohmann::json& config); [[nodiscard]] const nlohmann::json& config() const; void add_task(const std::string& task_type, const nlohmann::json& task); [[nodiscard]] bool has_task(const std::string& task_type) const; nlohmann::json remove_task(const std::string& task_type); [[nodiscard]] const nlohmann::json& get_tasks() const; void set_metadata(const std::string& key, const nlohmann::json& value); [[nodiscard]] bool has_metadata(const std::string& key) const; [[nodiscard]] nlohmann::json get_metadata() const; [[nodiscard]] nlohmann::json get_metadata(const std::string& key, bool fail_on_nonexist = false) const; [[nodiscard]] std::vector<std::string> list_metadata() const; std::shared_ptr<MessageMeta> payload(); void payload(const std::shared_ptr<MessageMeta>& payload); std::shared_ptr<TensorMemory> tensors(); void tensors(const std::shared_ptr<TensorMemory>& tensor_memory); ControlMessageType task_type(); void task_type(ControlMessageType task_type); void set_timestamp(const std::string& key, time_point_t timestamp_ns); std::optional<time_point_t> get_timestamp(const std::string& key, bool fail_if_nonexist = false); std::map<std::string, time_point_t> filter_timestamp(const std::string& regex_filter); private: static const std::string s_config_schema; // NOLINT static std::map<std::string, ControlMessageType> s_task_type_map; // NOLINT ControlMessageType m_cm_type{ControlMessageType::NONE}; std::shared_ptr<MessageMeta> m_payload{nullptr}; std::shared_ptr<TensorMemory> m_tensors{nullptr}; nlohmann::json m_tasks{}; nlohmann::json m_config{}; std::map<std::string, time_point_t> m_timestamps{}; }; struct ControlMessageProxy { static std::shared_ptr<ControlMessage> create(pybind11::dict& config); static std::shared_ptr<ControlMessage> create(std::shared_ptr<ControlMessage> other); static std::shared_ptr<ControlMessage> copy(ControlMessage& self); static pybind11::dict config(ControlMessage& self); static void config(ControlMessage& self, pybind11::dict& config); static void add_task(ControlMessage& self, const std::string& type, pybind11::dict& task); static pybind11::dict remove_task(ControlMessage& self, const std::string& type); static pybind11::dict get_tasks(ControlMessage& self); static void set_metadata(ControlMessage& self, const std::string& key, pybind11::object& value); static pybind11::object get_metadata(ControlMessage& self, const pybind11::object& key, pybind11::object default_value); static pybind11::list list_metadata(ControlMessage& self); static void payload_from_python_meta(ControlMessage& self, const pybind11::object& meta); static void set_timestamp(ControlMessage& self, const std::string& key, pybind11::object timestamp); static pybind11::object get_timestamp(ControlMessage& self, const std::string& key, bool fail_if_nonexist = false); static pybind11::dict filter_timestamp(ControlMessage& self, const std::string& regex_filter); }; #pragma GCC visibility pop } // namespace morpheus

© Copyright 2024, NVIDIA. Last updated on Apr 11, 2024.