Program Listing for File preprocess_fil.cpp

Return to documentation for file (morpheus/_lib/src/stages/preprocess_fil.cpp)

Copy
Copied!
            

#include "morpheus/stages/preprocess_fil.hpp" #include "morpheus/messages/memory/inference_memory_fil.hpp" #include "morpheus/messages/meta.hpp"// for MessageMeta #include "morpheus/objects/dev_mem_info.hpp"// for DevMemInfo #include "morpheus/objects/table_info.hpp"// for TableInfo #include "morpheus/objects/tensor.hpp" #include "morpheus/objects/tensor_object.hpp"// for TensorIndex #include "morpheus/utilities/matx_util.hpp" #include "morpheus/utilities/type_util.hpp" #include "morpheus/utilities/type_util_detail.hpp" #include <cuda_runtime.h>// for cudaMemcpy, cudaMemcpyDeviceToDevice #include <cudf/column/column.hpp>// for column, column::contents #include <cudf/column/column_view.hpp>// for column_view #include <cudf/table/table_view.hpp>// for table_view #include <cudf/types.hpp> #include <cudf/unary.hpp> #include <mrc/cuda/common.hpp>// for MRC_CHECK_CUDA #include <mrc/segment/builder.hpp> #include <pybind11/cast.h>// for object_api::operator(), operator""_a #include <pybind11/gil.h> #include <pybind11/pybind11.h>// for str_attr_accessor, arg #include <pybind11/pytypes.h> #include <pymrc/node.hpp> #include <rmm/cuda_stream_view.hpp>// for cuda_stream_per_thread #include <rmm/device_buffer.hpp>// for device_buffer #include <array> #include <cstddef> #include <cstdint> #include <exception> #include <memory> #include <type_traits>// for declval #include <utility> namespace morpheus { // Component public implementations // ************ PreprocessFILStage ************************* // PreprocessFILStage::PreprocessFILStage(const std::vector<std::string>& features) : PythonNode(base_t::op_factory_from_sub_fn(build_operator())), m_fea_cols(std::move(features)) {} PreprocessFILStage::subscribe_fn_t PreprocessFILStage::build_operator() { return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) { return input.subscribe(rxcpp::make_observer<sink_type_t>( [&output, this](sink_type_t x) { // TODO(MDD): Add some sort of lock here to prevent fixing columns after they have been accessed auto df_meta = x->get_meta(m_fea_cols); auto df_meta_col_names = df_meta.get_column_names(); auto packed_data = std::make_shared<rmm::device_buffer>( m_fea_cols.size() * x->mess_count * sizeof(float), rmm::cuda_stream_per_thread); std::vector<std::string> bad_cols; auto df_just_features = df_meta.get_view(); for (size_t i = 0; i < df_meta.num_columns(); ++i) { if (df_just_features.column(df_meta.num_indices() + i).type().id() == cudf::type_id::STRING) { bad_cols.push_back(df_meta_col_names[i]); } } // Need to ensure all string columns have been converted to numbers. This requires running a // regex which is too difficult to do from C++ at this time. So grab the GIL, make the // conversions, and release. This is horribly inefficient, but so is the JSON lines format for // this workflow if (!bad_cols.empty()) { using namespace pybind11::literals; pybind11::gil_scoped_acquire gil; pybind11::object df = x->meta->get_py_table(); std::string regex = R"((\d+))"; for (auto c : bad_cols) { df[pybind11::str(c)] = df[pybind11::str(c)] .attr("str") .attr("extract")(pybind11::str(regex), "expand"_a = true) .attr("astype")(pybind11::str("float32")); } // Now re-get the meta df_meta = x->get_meta(m_fea_cols); df_just_features = df_meta.get_view(); } for (size_t i = 0; i < df_meta.num_columns(); ++i) { auto curr_col = df_just_features.column(df_meta.num_indices() + i); auto curr_ptr = static_cast<float*>(packed_data->data()) + i * df_just_features.num_rows(); // Check if we are something other than float if (curr_col.type().id() != cudf::type_id::FLOAT32) { auto float_data = cudf::cast(curr_col, cudf::data_type(cudf::type_id::FLOAT32))->release(); // Do the copy here before it goes out of scope MRC_CHECK_CUDA(cudaMemcpy(curr_ptr, float_data.data->data(), df_just_features.num_rows() * sizeof(float), cudaMemcpyDeviceToDevice)); } else { MRC_CHECK_CUDA(cudaMemcpy(curr_ptr, curr_col.data<float>(), df_just_features.num_rows() * sizeof(float), cudaMemcpyDeviceToDevice)); } } // Need to do a transpose here auto transposed_data = MatxUtil::transpose( DevMemInfo{packed_data, TypeId::FLOAT32, {x->mess_count, m_fea_cols.size()}, {1, x->mess_count}}); auto input__0 = Tensor::create(transposed_data, DType::create<float>(), std::vector<TensorIndex>{static_cast<long long>(x->mess_count), static_cast<int>(m_fea_cols.size())}, std::vector<TensorIndex>{}, 0); auto seg_ids = Tensor::create(MatxUtil::create_seg_ids(x->mess_count, m_fea_cols.size(), TypeId::UINT32), DType::create<uint32_t>(), std::vector<TensorIndex>{static_cast<long long>(x->mess_count), static_cast<int>(3)}, std::vector<TensorIndex>{}, 0); // Build the results auto memory = std::make_shared<InferenceMemoryFIL>(x->mess_count, input__0, seg_ids); auto next = std::make_shared<MultiInferenceMessage>( x->meta, x->mess_offset, x->mess_count, std::move(memory), 0, memory->count); output.on_next(std::move(next)); }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { output.on_completed(); })); }; } // ************ PreprocessFILStageInterfaceProxy *********** // std::shared_ptr<mrc::segment::Object<PreprocessFILStage>> PreprocessFILStageInterfaceProxy::init( mrc::segment::Builder& builder, const std::string& name, const std::vector<std::string>& features) { auto stage = builder.construct_object<PreprocessFILStage>(name, features); return stage; } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Feb 3, 2023.