Program Listing for File serializers.cpp#
↰ Return to documentation for file (python/morpheus/morpheus/_lib/src/io/serializers.cpp)
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "morpheus/io/serializers.hpp"
#include "morpheus/objects/data_table.hpp" // for IDataTable
#include "morpheus/objects/file_types.hpp"
#include "morpheus/objects/table_info_data.hpp" // for TableInfoData
#include "morpheus/utilities/cudf_util.hpp"
#include <cudf/column/column_view.hpp> // for column_view
#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/json.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/io/types.hpp> // for column_name_info, sink_info, table_metadata
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <cudf/utilities/traits.hpp> // for is_nested
#include <glog/logging.h>
#include <pybind11/gil.h> // for PyGILState_Check, gil_scoped_acquire
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <pybind11/stl.h> // IWYU pragma: keep
#include <cstddef> // for size_t
#include <fstream>
#include <memory> // for shared_ptr
#include <numeric>
#include <sstream> // IWYU pragma: keep
#include <vector>
// IWYU pragma: no_include <unordered_map>
namespace py = pybind11;
using namespace py::literals;
using namespace std::string_literals;
namespace {
cudf::io::column_name_info make_column_name_info(std::string name, const py::object& py_col)
{
// construct a column_name_info from a python column object, loosely based on the _dtype_to_names_list
// method in cudf's `python/cudf/cudf/io/json.py`
DCHECK(PyGILState_Check() != 0);
auto dtypes_mod = py::module_::import("cudf.core.dtypes");
auto StructDtype = dtypes_mod.attr("StructDtype");
auto ListDtype = dtypes_mod.attr("ListDtype");
const auto& py_dtype = py_col.attr("dtype");
bool is_struct_col = py::isinstance(py_dtype, StructDtype);
bool is_list_col = py::isinstance(py_dtype, ListDtype);
py::list fields;
if (is_struct_col)
{
// Attribute only exists on StructDtype
fields = py::list(py_col.attr("dtype").attr("fields").attr("keys")());
}
std::vector<cudf::io::column_name_info> children;
if (is_struct_col || is_list_col)
{
auto py_children = py_col.attr("children");
std::size_t i = 0;
for (auto& child : py_children)
{
// child is a handle
const auto& py_child = child.cast<py::object>();
std::string child_name{};
if (is_struct_col)
{
child_name = fields[i].cast<std::string>();
}
children.emplace_back(make_column_name_info(child_name, py_child));
++i;
}
}
cudf::io::column_name_info col_info{std::move(name)};
col_info.children = std::move(children);
return col_info;
}
cudf::io::table_metadata build_cudf_metadata(const cudf::table_view& tbl_view,
const std::vector<std::string>& column_names,
const py::object& df)
{
std::vector<std::size_t> nested_col_indicies;
std::vector<cudf::io::column_name_info> column_name_infos(column_names.size());
for (std::size_t i = 0; i < column_names.size(); ++i)
{
if (!cudf::is_nested(tbl_view.column(i).type()))
{
column_name_infos[i] = column_names[i];
}
else
{
nested_col_indicies.push_back(i);
}
}
// If we have a struct column, we need to grab the GIL and inspect the children
// Remove once https://github.com/rapidsai/cudf/issues/19215 is resolved
if (!nested_col_indicies.empty())
{
pybind11::gil_scoped_acquire gil;
// we need the column objects not the series objects
const pybind11::tuple& df_columns = df.attr("_columns");
const auto num_df_cols = py::len(df_columns);
// When the index is included in the output, the index doesn't appear in the DataFrame's _columns
DCHECK(num_df_cols == column_names.size() || num_df_cols + 1 == column_names.size())
<< "Number of columns in DataFrame does not match number of column names provided";
const auto col_idx_offset = column_names.size() - num_df_cols;
for (const auto col_idx : nested_col_indicies)
{
const auto& py_col = df_columns[col_idx - col_idx_offset];
column_name_infos[col_idx] = make_column_name_info(column_names[col_idx], py_col);
}
}
return cudf::io::table_metadata{std::move(column_name_infos)};
}
} // namespace
namespace morpheus {
class OStreamSink : public cudf::io::data_sink
{
public:
OStreamSink(std::ostream& stream) : m_stream(stream) {}
void host_write(void const* data, size_t size) override
{
m_stream.write(static_cast<char const*>(data), size);
m_bytest_written += size;
}
void flush() override
{
m_stream.flush();
}
size_t bytes_written() override
{
return m_bytest_written;
}
private:
std::ostream& m_stream;
size_t m_bytest_written{0};
};
void table_to_csv(
const TableInfoData& tbl, std::ostream& out_stream, bool include_header, bool include_index_col, bool flush)
{
auto column_names = tbl.column_names;
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}
std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.table_view.select(col_idexes);
OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
.include_header(include_header)
.true_value("True"s)
.false_value("False"s);
if (include_header)
{
options_builder = options_builder.names(column_names);
}
cudf::io::write_csv(options_builder.build());
if (flush)
{
sink.flush();
}
}
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col, bool flush)
{
table_to_csv(tbl.get_data(), out_stream, include_header, include_index_col, flush);
}
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;
df_to_csv(tbl, out_stream, include_header, include_index_col);
return out_stream.str();
}
void table_to_json(
const TableInfoData& tbl, const py::object& df, std::ostream& out_stream, bool include_index_col, bool flush)
{
if (!include_index_col)
{
LOG(WARNING) << "Ignoring include_index_col=false as this isn't supported by cuDF";
}
std::vector<cudf::size_type> col_idexes(tbl.column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), 1);
auto tbl_view = tbl.table_view.select(col_idexes);
auto tbl_meta = build_cudf_metadata(tbl_view, tbl.column_names, df);
OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::json_writer_options_builder(destination, tbl_view)
.metadata(std::move(tbl_meta))
.lines(true)
.include_nulls(true)
.na_rep("null");
cudf::io::write_json(options_builder.build());
if (flush)
{
sink.flush();
}
}
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col, bool flush)
{
table_to_json(tbl.get_data(), tbl.get_parent()->get_py_object(), out_stream, include_index_col, flush);
}
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;
df_to_json(tbl, out_stream, include_index_col);
return out_stream.str();
}
void table_to_parquet(const TableInfoData& tbl,
const py::object& df,
std::ostream& out_stream,
bool include_header,
bool include_index_col,
bool flush)
{
auto column_names = tbl.column_names;
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}
std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.table_view.select(col_idexes);
cudf::io::table_input_metadata tbl_meta(build_cudf_metadata(tbl_view, column_names, df));
OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder =
cudf::io::parquet_writer_options_builder(destination, tbl_view).metadata(std::move(tbl_meta));
cudf::io::write_parquet(options_builder.build());
if (flush)
{
sink.flush();
}
}
void df_to_parquet(
const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col, bool flush)
{
table_to_parquet(
tbl.get_data(), tbl.get_parent()->get_py_object(), out_stream, include_header, include_index_col, flush);
}
std::string df_to_parquet(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;
df_to_parquet(tbl, out_stream, include_header, include_index_col);
return out_stream.str();
}
template <typename T>
T get_with_default(const py::dict& d, const std::string& key, T default_value)
{
if (d.contains(key))
{
return d[key.c_str()].cast<T>();
}
return default_value;
}
void SerializersProxy::write_df_to_file(pybind11::object df,
std::string filename,
FileTypes file_type,
const py::kwargs& kwargs)
{
CudfHelper::load();
if (file_type == FileTypes::Auto)
{
file_type = determine_file_type(filename); // throws if it is unable to determine the type
}
std::ofstream out_file;
out_file.open(filename);
auto tbl = CudfHelper::CudfHelper::table_info_data_from_table(df);
switch (file_type)
{
case FileTypes::JSON: {
table_to_json(tbl,
df,
out_file,
get_with_default(kwargs, "include_index_col", true),
get_with_default(kwargs, "flush", false));
break;
}
case FileTypes::CSV: {
table_to_csv(tbl,
out_file,
get_with_default(kwargs, "include_header", true),
get_with_default(kwargs, "include_index_col", true),
get_with_default(kwargs, "flush", false));
break;
}
case FileTypes::PARQUET: {
table_to_parquet(tbl,
df,
out_file,
get_with_default(kwargs, "include_header", true),
get_with_default(kwargs, "include_index_col", true),
get_with_default(kwargs, "flush", false));
break;
}
case FileTypes::Auto:
default:
throw std::logic_error(MORPHEUS_CONCAT_STR("Unsupported filetype: " << file_type));
}
}
} // namespace morpheus