Program Listing for File preallocate.hpp

Return to documentation for file (morpheus/_lib/include/morpheus/stages/preallocate.hpp)


/* * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include "morpheus/messages/meta.hpp" #include "morpheus/messages/multi.hpp" #include "morpheus/objects/dtype.hpp" // for TypeId #include <mrc/node/sink_properties.hpp> #include <mrc/node/source_properties.hpp> #include <mrc/segment/builder.hpp> #include <mrc/segment/object.hpp> #include <pymrc/node.hpp> #include <rxcpp/rx.hpp> #include <memory> #include <string> #include <tuple> #include <vector> namespace morpheus { #pragma GCC visibility push(default) namespace { void preallocate(std::shared_ptr<morpheus::MessageMeta> msg, const std::vector<std::tuple<std::string, morpheus::DType>>& columns) { auto table = msg->get_mutable_info(); table.insert_missing_columns(columns); } void preallocate(std::shared_ptr<morpheus::MultiMessage> msg, const std::vector<std::tuple<std::string, morpheus::DType>>& columns) { preallocate(msg->meta, columns); } } // namespace /****** Component public implementations *******************/ /****** PreallocateStage ********************************/ /* Preallocates new columns into the underlying dataframe. This stage supports both MessageMeta & subclasses of * MultiMessage. In the Python bindings the stage is bound as `PreallocateMessageMetaStage` and * `PreallocateMultiMessageStage` */ template <typename MessageT> class PreallocateStage : public mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>> { public: using base_t = mrc::pymrc::PythonNode<std::shared_ptr<MessageT>, std::shared_ptr<MessageT>>; using typename base_t::sink_type_t; using typename base_t::source_type_t; using typename base_t::subscribe_fn_t; PreallocateStage(const std::vector<std::tuple<std::string, TypeId>>& needed_columns); private: subscribe_fn_t build_operator(); std::vector<std::tuple<std::string, DType>> m_needed_columns; }; /****** DeserializationStageInterfaceProxy******************/ template <typename MessageT> struct PreallocateStageInterfaceProxy { static std::shared_ptr<mrc::segment::Object<PreallocateStage<MessageT>>> init( mrc::segment::Builder& builder, const std::string& name, std::vector<std::tuple<std::string, TypeId>> needed_columns); }; template <typename MessageT> PreallocateStage<MessageT>::PreallocateStage(const std::vector<std::tuple<std::string, TypeId>>& needed_columns) : base_t(base_t::op_factory_from_sub_fn(build_operator())) { for (const auto& col : needed_columns) { m_needed_columns.emplace_back(std::make_tuple<>(std::get<0>(col), DType(std::get<1>(col)))); } } template <typename MessageT> typename PreallocateStage<MessageT>::subscribe_fn_t PreallocateStage<MessageT>::build_operator() { return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) { return input.subscribe(rxcpp::make_observer<sink_type_t>( [this, &output](sink_type_t x) { // Since the msg was just emitted from the source we shouldn't have any trouble acquiring the mutex. preallocate(x, m_needed_columns); output.on_next(std::move(x)); }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { output.on_completed(); })); }; } template <typename MessageT> std::shared_ptr<mrc::segment::Object<PreallocateStage<MessageT>>> PreallocateStageInterfaceProxy<MessageT>::init( mrc::segment::Builder& builder, const std::string& name, std::vector<std::tuple<std::string, TypeId>> needed_columns) { return builder.construct_object<PreallocateStage<MessageT>>(name, needed_columns); } #pragma GCC visibility pop } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Feb 2, 2024.