Program Listing for File deserializers.cpp

Return to documentation for file (morpheus/_lib/src/io/deserializers.cpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "morpheus/io/deserializers.hpp" #include "morpheus/utilities/cudf_util.hpp" // for CudfHelper #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" #include <cudf/column/column.hpp> #include <cudf/io/csv.hpp> #include <cudf/io/parquet.hpp> #include <cudf/scalar/scalar.hpp> // for string_scalar #include <cudf/strings/replace.hpp> #include <cudf/strings/strings_column_view.hpp> #include <cudf/table/table.hpp> // IWYU pragma: keep #include <cudf/types.hpp> // for cudf::type_id #include <ext/alloc_traits.h> #include <glog/logging.h> #include <pybind11/pybind11.h> // IWYU pragma: keep #include <algorithm> #include <cstddef> #include <memory> #include <regex> #include <sstream> #include <stdexcept> #include <utility> // We're already including pybind11.h, and including only gil.h as IWYU suggests yields an undefined symbol error // IWYU pragma: no_include namespace { const std::regex IndexRegex(R"(^\s*(unnamed: 0|id)\s*$)", std::regex_constants::ECMAScript | std::regex_constants::icase); const std::regex UnnamedRegex(R"(^\s*unnamed: 0\s*$)", std::regex_constants::ECMAScript | std::regex_constants::icase); } // namespace namespace morpheus { std::vector<std::string> get_column_names_from_table(const cudf::io::table_with_metadata& table) { DCHECK(!(!table.metadata.column_names.empty() && !table.metadata.schema_info.empty())) << "Both column_names and schema_info were set on the table_with_metadata object. Defaulting to column_names"; // If column_names is populated, use that if (!table.metadata.column_names.empty()) { return table.metadata.column_names; } // Otherwise, use schema_info if (!table.metadata.schema_info.empty()) { return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; }); } // Return empty return {}; } cudf::io::table_with_metadata load_json_table(cudf::io::json_reader_options&& json_options) { auto tbl = cudf::io::read_json(json_options); auto column_names = get_column_names_from_table(tbl); auto found = std::find(column_names.begin(), column_names.end(), "data"); if (found == column_names.end()) return tbl; // Super ugly but cudf cant handle newlines and add extra escapes. So we need to convert // \\n -> \n // \\/ -> \/ auto columns = tbl.tbl->release(); size_t idx = found - column_names.begin(); auto updated_data = cudf::strings::replace( cudf::strings_column_view{columns[idx]->view()}, cudf::string_scalar("\\n"), cudf::string_scalar("\n")); updated_data = cudf::strings::replace( cudf::strings_column_view{updated_data->view()}, cudf::string_scalar("\\/"), cudf::string_scalar("/")); columns[idx] = std::move(updated_data); tbl.tbl = std::move(std::make_unique<cudf::table>(std::move(columns))); return tbl; } cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type) { if (file_type == FileTypes::Auto) { file_type = determine_file_type(filename); // throws if it is unable to determine the type } cudf::io::table_with_metadata table; switch (file_type) { case FileTypes::JSON: { auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(true); table = load_json_table(options.build()); break; } case FileTypes::CSV: { auto options = cudf::io::csv_reader_options::builder(cudf::io::source_info{filename}); table = cudf::io::read_csv(options.build()); break; } case FileTypes::PARQUET: { auto options = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filename}); table = cudf::io::read_parquet(options.build()); break; } case FileTypes::Auto: default: throw std::logic_error(MORPHEUS_CONCAT_STR("Unsupported filetype: " << file_type)); } if (!table.tbl) { throw std::runtime_error(MORPHEUS_CONCAT_STR("Failed to load file '" << filename << "' as type " << file_type)); } return table; } pybind11::object read_file_to_df(const std::string& filename, FileTypes file_type) { auto table = load_table_from_file(filename, file_type); int index_col_count = prepare_df_index(table); pybind11::gil_scoped_acquire gil; return CudfHelper::table_from_table_with_metadata(std::move(table), index_col_count); } int get_index_col_count(const cudf::io::table_with_metadata& data_table) { int index_col_count = 0; const auto& col_names = data_table.metadata.column_names; // Check if we have a first column with INT64 data type if (col_names.size() >= 1 && data_table.tbl->get_column(0).type().id() == cudf::type_id::INT64) { // Get the column name const auto& col_name = col_names[0]; // Check it against some common terms if (std::regex_search(col_name, IndexRegex)) { index_col_count = 1; } } return index_col_count; } int prepare_df_index(cudf::io::table_with_metadata& data_table) { const int index_col_count = get_index_col_count(data_table); if (index_col_count > 0) { auto& col_names = data_table.metadata.column_names; auto& col_name = col_names[0]; // Also, if its the hideous 'Unnamed: 0', then just use an empty string if (std::regex_search(col_name, UnnamedRegex)) { col_name.clear(); } } return index_col_count; } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Apr 11, 2023.