Program Listing for File deserializers.cpp#

Return to documentation for file (python/morpheus/morpheus/_lib/src/io/deserializers.cpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "morpheus/io/deserializers.hpp"

#include "morpheus/utilities/cudf_util.hpp"  // for CudfHelper
#include "morpheus/utilities/stage_util.hpp"
#include "morpheus/utilities/string_util.hpp"
#include "morpheus/utilities/table_util.hpp"  // for get_column_names

#include <cudf/column/column.hpp>
#include <cudf/io/csv.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>  // IWYU pragma: keep
#include <cudf/types.hpp>        // for cudf::type_id
#include <pybind11/pybind11.h>   // IWYU pragma: keep

#include <memory>
#include <regex>
#include <sstream>
#include <stdexcept>
#include <utility>
// We're already including pybind11.h, and including only gil.h as IWYU suggests yields an undefined symbol error
// IWYU pragma: no_include <pybind11/gil.h>

namespace {
const std::regex INDEX_REGEX(R"(^\s*(unnamed: 0|id)\s*$)",
                             std::regex_constants::ECMAScript | std::regex_constants::icase);

const std::regex UNNAMED_REGEX(R"(^\s*unnamed: 0\s*$)", std::regex_constants::ECMAScript | std::regex_constants::icase);
}  // namespace

namespace morpheus {

std::vector<std::string> get_column_names_from_table(const cudf::io::table_with_metadata& table)
{
    return foreach_map(table.metadata.schema_info, [](auto schema) {
        return schema.name;
    });
}

cudf::io::table_with_metadata load_table_from_file(const std::string& filename,
                                                   FileTypes file_type,
                                                   std::optional<bool> json_lines)
{
    if (file_type == FileTypes::Auto)
    {
        file_type = determine_file_type(filename);  // throws if it is unable to determine the type
    }

    cudf::io::table_with_metadata table;

    switch (file_type)
    {
    case FileTypes::JSON: {
        auto options =
            cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true));
        table = cudf::io::read_json(options.build());
        break;
    }
    case FileTypes::CSV: {
        auto options = cudf::io::csv_reader_options::builder(cudf::io::source_info{filename});
        table        = cudf::io::read_csv(options.build());
        break;
    }
    case FileTypes::PARQUET: {
        auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filename});
        table        = cudf::io::read_parquet(options.build());
        break;
    }
    case FileTypes::Auto:
    default:
        throw std::logic_error(MORPHEUS_CONCAT_STR("Unsupported filetype: " << file_type));
    }

    if (!table.tbl)
    {
        throw std::runtime_error(MORPHEUS_CONCAT_STR("Failed to load file '" << filename << "' as type " << file_type));
    }

    return table;
}

pybind11::object read_file_to_df(const std::string& filename, FileTypes file_type)
{
    CudfHelper::load();
    auto table          = load_table_from_file(filename, file_type);
    int index_col_count = prepare_df_index(table);

    pybind11::gil_scoped_acquire gil;
    return CudfHelper::table_from_table_with_metadata(std::move(table), index_col_count);
}

int get_index_col_count(const cudf::io::table_with_metadata& data_table)
{
    int index_col_count = 0;

    std::vector<std::string> names = CuDFTableUtil::get_column_names(data_table);

    // Check if we have a first column with INT64 data type
    if (names.size() >= 1 && data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64)
    {
        // Get the column name
        const auto& col_name = names[0];

        // Check it against some common terms
        if (std::regex_search(col_name, INDEX_REGEX))
        {
            index_col_count = 1;
        }
    }

    return index_col_count;
}

int prepare_df_index(cudf::io::table_with_metadata& data_table)
{
    const int index_col_count = get_index_col_count(data_table);

    if (index_col_count > 0)
    {
        auto& col_name = data_table.metadata.schema_info[0].name;

        // Also, if its the hideous 'Unnamed: 0', then just use an empty string
        if (std::regex_search(col_name, UNNAMED_REGEX))
        {
            col_name.clear();
        }
    }

    return index_col_count;
}

}  // namespace morpheus