What can I help you with?
NVIDIA Morpheus (25.02.01)

Program Listing for File monitor.hpp

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "morpheus/controllers/monitor_controller.hpp" // for MonitorController #include "morpheus/export.h" // for MORPHEUS_EXPORT #include <indicators/progress_bar.hpp> #include <mrc/segment/builder.hpp> // for Builder #include <pymrc/node.hpp> // for PythonNode #include <rxcpp/rx.hpp> // for trace_activity, decay_t, from #include <optional> #include <string> namespace morpheus { /*************** Component public implementations ***************/ /******************** MonitorStage ********************/ template <typename MessageT> class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>> { public: using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>; using typename base_t::sink_type_t; using typename base_t::source_type_t; using typename base_t::subscribe_fn_t; MonitorStage(const std::string& description, const std::string& unit = "messages", indicators::Color = indicators::Color::cyan, indicators::FontStyle font_style = indicators::FontStyle::bold, std::optional<std::function<size_t(sink_type_t)>> determine_count_fn = std::nullopt); private: subscribe_fn_t build_operator(); MonitorController<sink_type_t> m_monitor_controller; }; template <typename MessageT> MonitorStage<MessageT>::MonitorStage(const std::string& description, const std::string& unit, indicators::Color text_color, indicators::FontStyle font_style, std::optional<std::function<size_t(sink_type_t)>> determine_count_fn) : base_t(base_t::op_factory_from_sub_fn(build_operator())), m_monitor_controller(MonitorController<sink_type_t>(description, unit, text_color, font_style, determine_count_fn)) {} template <typename MessageT> MonitorStage<MessageT>::subscribe_fn_t MonitorStage<MessageT>::build_operator() { return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) { return input.subscribe(rxcpp::make_observer<sink_type_t>( [this, &output](sink_type_t msg) { m_monitor_controller.progress_sink(msg); output.on_next(std::move(msg)); }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { m_monitor_controller.sink_on_completed(); output.on_completed(); })); }; } /****** MonitorStageInterfaceProxy******************/ template <typename MessageT> struct MORPHEUS_EXPORT MonitorStageInterfaceProxy { static std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> init( mrc::segment::Builder& builder, const std::string& name, const std::string& description, const std::string& unit, indicators::Color color = indicators::Color::cyan, indicators::FontStyle font_style = indicators::FontStyle::bold, std::optional<std::function<size_t(typename MonitorStage<MessageT>::sink_type_t)>> determine_count_fn = std::nullopt); }; template <typename MessageT> std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> MonitorStageInterfaceProxy<MessageT>::init( mrc::segment::Builder& builder, const std::string& name, const std::string& description, const std::string& unit, indicators::Color text_color, indicators::FontStyle font_style, std::optional<std::function<size_t(typename MonitorStage<MessageT>::sink_type_t)>> determine_count_fn) { auto stage = builder.construct_object<MonitorStage<MessageT>>( name, description, unit, text_color, font_style, determine_count_fn); return stage; } // end of group } // namespace morpheus

© Copyright 2024, NVIDIA. Last updated on Mar 3, 2025.