Program Listing for File monitor.hpp
↰ Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/monitor.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "morpheus/controllers/monitor_controller.hpp" // for MonitorController
#include "morpheus/export.h" // for MORPHEUS_EXPORT
#include <indicators/progress_bar.hpp>
#include <mrc/segment/builder.hpp> // for Builder
#include <pymrc/node.hpp> // for PythonNode
#include <rxcpp/rx.hpp> // for trace_activity, decay_t, from
#include <optional>
#include <string>
namespace morpheus {
/*************** Component public implementations ***************/
/******************** MonitorStage ********************/
template <typename MessageT>
class MORPHEUS_EXPORT MonitorStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>
{
public:
using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>;
using typename base_t::sink_type_t;
using typename base_t::source_type_t;
using typename base_t::subscribe_fn_t;
MonitorStage(const std::string& description,
const std::string& unit = "messages",
indicators::Color = indicators::Color::cyan,
indicators::FontStyle font_style = indicators::FontStyle::bold,
std::optional<std::function<size_t(sink_type_t)>> determine_count_fn = std::nullopt);
private:
subscribe_fn_t build_operator();
MonitorController<sink_type_t> m_monitor_controller;
};
template <typename MessageT>
MonitorStage<MessageT>::MonitorStage(const std::string& description,
const std::string& unit,
indicators::Color text_color,
indicators::FontStyle font_style,
std::optional<std::function<size_t(sink_type_t)>> determine_count_fn) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_monitor_controller(MonitorController<sink_type_t>(description, unit, text_color, font_style, determine_count_fn))
{}
template <typename MessageT>
MonitorStage<MessageT>::subscribe_fn_t MonitorStage<MessageT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output](sink_type_t msg) {
m_monitor_controller.progress_sink(msg);
output.on_next(std::move(msg));
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
m_monitor_controller.sink_on_completed();
output.on_completed();
}));
};
}
/****** MonitorStageInterfaceProxy******************/
template <typename MessageT>
struct MORPHEUS_EXPORT MonitorStageInterfaceProxy
{
static std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> init(
mrc::segment::Builder& builder,
const std::string& name,
const std::string& description,
const std::string& unit,
indicators::Color color = indicators::Color::cyan,
indicators::FontStyle font_style = indicators::FontStyle::bold,
std::optional<std::function<size_t(typename MonitorStage<MessageT>::sink_type_t)>> determine_count_fn =
std::nullopt);
};
template <typename MessageT>
std::shared_ptr<mrc::segment::Object<MonitorStage<MessageT>>> MonitorStageInterfaceProxy<MessageT>::init(
mrc::segment::Builder& builder,
const std::string& name,
const std::string& description,
const std::string& unit,
indicators::Color text_color,
indicators::FontStyle font_style,
std::optional<std::function<size_t(typename MonitorStage<MessageT>::sink_type_t)>> determine_count_fn)
{
auto stage = builder.construct_object<MonitorStage<MessageT>>(
name, description, unit, text_color, font_style, determine_count_fn);
return stage;
} // end of group
} // namespace morpheus