Program Listing for File preprocess_fil.cpp#

Return to documentation for file (python/morpheus/morpheus/_lib/src/stages/preprocess_fil.cpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION &
 * AFFILIATES. All rights reserved. SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "morpheus/stages/preprocess_fil.hpp"

#include "mrc/segment/object.hpp"  // for Object

#include "morpheus/messages/control.hpp"               // for ControlMessage
#include "morpheus/messages/memory/tensor_memory.hpp"  // for TensorMemory
#include "morpheus/messages/meta.hpp"                  // for MessageMeta
#include "morpheus/objects/dev_mem_info.hpp"           // for DevMemInfo
#include "morpheus/objects/dtype.hpp"                  // for DType, TypeId
#include "morpheus/objects/table_info.hpp"             // for TableInfo, MutableTableInfo
#include "morpheus/objects/tensor.hpp"                 // for Tensor
#include "morpheus/objects/tensor_object.hpp"          // for TensorObject
#include "morpheus/types.hpp"                          // for TensorIndex
#include "morpheus/utilities/matx_util.hpp"            // for MatxUtil

#include <cuda_runtime.h>               // for cudaMemcpy, cudaMemcpyKind
#include <cudf/column/column.hpp>       // for column
#include <cudf/column/column_view.hpp>  // for column_view
#include <cudf/types.hpp>               // for type_id, data_type
#include <cudf/unary.hpp>               // for cast
#include <mrc/cuda/common.hpp>          // for MRC_CHECK_CUDA
#include <mrc/segment/builder.hpp>      // for Builder
#include <pybind11/gil.h>               // for gil_scoped_acquire
#include <pybind11/pybind11.h>          // for object_api::operator(), operator""_a, arg
#include <pybind11/pytypes.h>           // for object, str, object_api, generic_item, literals
#include <rmm/cuda_stream_view.hpp>     // for cuda_stream_per_thread
#include <rmm/device_buffer.hpp>        // for device_buffer

#include <algorithm>  // for find
#include <cstddef>    // for size_t
#include <memory>     // for shared_ptr, allocator, __shared_ptr_access, make_shared
#include <utility>    // for move

namespace morpheus {
// Component public implementations
// ************ PreprocessFILStage ************************* //
PreprocessFILStage::PreprocessFILStage(const std::vector<std::string>& features) :
  base_t(rxcpp::operators::map([this](sink_type_t msg) {
      return this->on_data(std::move(msg));
  })),
  m_fea_cols(std::move(features))
{}

TableInfo PreprocessFILStage::fix_bad_columns(sink_type_t msg)
{
    {
        // Get the mutable info for the entire meta object so we only do this once
        // per dataframe
        auto mutable_info      = msg->payload()->get_mutable_info();
        auto df_meta_col_names = mutable_info.get_column_names();

        std::vector<std::string> bad_cols;

        // Only check the feature columns. Leave the rest unchanged
        for (auto& fea_col : m_fea_cols)
        {
            // Find the index of the column in the dataframe
            auto col_idx =
                std::find(df_meta_col_names.begin(), df_meta_col_names.end(), fea_col) - df_meta_col_names.begin();

            if (col_idx == df_meta_col_names.size())
            {
                // This feature was not found. Ignore it.
                continue;
            }

            if (mutable_info.get_column(col_idx).type().id() == cudf::type_id::STRING)
            {
                bad_cols.push_back(fea_col);
            }
        }

        // Exit early if there is nothing to do
        if (!bad_cols.empty())
        {
            // Need to ensure all string columns have been converted to numbers. This
            // requires running a regex which is too difficult to do from C++ at this
            // time. So grab the GIL, make the conversions, and release. This is
            // horribly inefficient, but so is the JSON lines format for this workflow
            using namespace pybind11::literals;
            pybind11::gil_scoped_acquire gil;

            // pybind11::object df = x->meta->get_py_table();
            auto pdf = mutable_info.checkout_obj();
            auto& df = *pdf;

            std::string regex = R"((\d+))";

            for (auto c : bad_cols)
            {
                df[pybind11::str(c)] = df[pybind11::str(c)]
                                           .attr("str")
                                           .attr("extract")(pybind11::str(regex), "expand"_a = true)
                                           .attr("astype")(pybind11::str("float32"));
            }

            mutable_info.return_obj(std::move(pdf));
        }
    }

    // Now re-get the meta
    return msg->payload()->get_info(m_fea_cols);
}

PreprocessFILStage::source_type_t PreprocessFILStage::on_data(sink_type_t msg)
{
    auto df_meta        = this->fix_bad_columns(msg);
    const auto num_rows = df_meta.num_rows();

    auto packed_data =
        std::make_shared<rmm::device_buffer>(m_fea_cols.size() * num_rows * sizeof(float), rmm::cuda_stream_per_thread);

    for (size_t i = 0; i < df_meta.num_columns(); ++i)
    {
        auto curr_col = df_meta.get_column(i);
        auto curr_ptr = static_cast<float*>(packed_data->data()) + i * num_rows;

        // Check if we are something other than float
        if (curr_col.type().id() != cudf::type_id::FLOAT32)
        {
            auto float_data = cudf::cast(curr_col, cudf::data_type(cudf::type_id::FLOAT32))->release();

            // Do the copy here before it goes out of scope
            MRC_CHECK_CUDA(
                cudaMemcpy(curr_ptr, float_data.data->data(), num_rows * sizeof(float), cudaMemcpyDeviceToDevice));
        }
        else
        {
            MRC_CHECK_CUDA(cudaMemcpy(
                curr_ptr, curr_col.template data<float>(), num_rows * sizeof(float), cudaMemcpyDeviceToDevice));
        }
    }

    // Need to convert from row major to column major
    // Easiest way to do this is to transpose the data from [fea_len, row_count]
    // to [row_count, fea_len]
    auto transposed_data = MatxUtil::transpose(DevMemInfo{
        packed_data, TypeId::FLOAT32, {static_cast<TensorIndex>(m_fea_cols.size()), num_rows}, {num_rows, 1}});

    // Create the tensor which will be row-major and size [row_count, fea_len]
    auto input__0 = Tensor::create(
        transposed_data, DType::create<float>(), {num_rows, static_cast<TensorIndex>(m_fea_cols.size())}, {}, 0);

    auto seq_id_dtype = DType::create<TensorIndex>();
    auto seq_ids      = Tensor::create(
        MatxUtil::create_seq_ids(num_rows, m_fea_cols.size(), seq_id_dtype.type_id(), input__0.get_memory(), 0),
        seq_id_dtype,
        {num_rows, 3},
        {},
        0);

    // Build the results
    auto memory = std::make_shared<TensorMemory>(num_rows);
    memory->set_tensor("input__0", std::move(input__0));
    memory->set_tensor("seq_ids", std::move(seq_ids));
    msg->tensors(memory);

    return msg;
}

// ************ PreprocessFILStageInterfaceProxy *********** //
std::shared_ptr<mrc::segment::Object<PreprocessFILStage>> PreprocessFILStageInterfaceProxy::init(
    mrc::segment::Builder& builder, const std::string& name, const std::vector<std::string>& features)
{
    auto stage = builder.construct_object<PreprocessFILStage>(name, features);

    return stage;
}
}  // namespace morpheus