↰ Return to documentation for file (morpheus/_lib/src/io/serializers.cpp
)
#include "morpheus/io/serializers.hpp"
#include <cudf/io/csv.hpp>
#include <cudf/io/data_sink.hpp>
#include <cudf/io/types.hpp>// for column_name_info, sink_info, table_metadata
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>
#include <pybind11/cast.h>
#include <pybind11/gil.h>
#include <pybind11/pybind11.h>
#include <pybind11/pytypes.h>
#include <rmm/mr/device/per_device_resource.hpp>
#include <array>// for array
#include <cstddef>// for size_t
#include <numeric>
#include <ostream>
#include <sstream>// IWYU pragma: keep
#include <vector>
// IWYU pragma: no_include <unordered_map>
namespace morpheus {
namespace py = pybind11;
using namespace py::literals;
using namespace std::string_literals;
class OStreamSink : public cudf::io::data_sink
{
public:
OStreamSink(std::ostream& stream) : m_stream(stream) {}
void host_write(void const* data, size_t size) override
{
m_stream.write(static_cast<char const*>(data), size);
m_bytest_written += size;
}
void flush() override
{
m_stream.flush();
}
size_t bytes_written() override
{
return m_bytest_written;
}
private:
std::ostream& m_stream;
size_t m_bytest_written{0};
};
std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col)
{
// Create an ostringstream and use that with the overload accepting an ostream
std::ostringstream out_stream;
df_to_csv(tbl, out_stream, include_header, include_index_col);
return out_stream.str();
}
void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col)
{
auto column_names = tbl.get_column_names();
cudf::size_type start_col = 1;
if (include_index_col)
{
start_col = 0;
column_names.insert(column_names.begin(), ""s); // insert the id column
}
std::vector<cudf::size_type> col_idexes(column_names.size());
std::iota(col_idexes.begin(), col_idexes.end(), start_col);
auto tbl_view = tbl.get_view().select(col_idexes);
OStreamSink sink(out_stream);
auto destination = cudf::io::sink_info(&sink);
auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view)
.include_header(include_header)
.true_value("True"s)
.false_value("False"s);
cudf::io::table_metadata metadata{};
if (include_header)
{
metadata.column_names = column_names;
// After cuDF PR #11364, use schema_info instead of column_names (actually just set both)
metadata.schema_info = std::vector<cudf::io::column_name_info>();
for (auto& name : column_names)
{
metadata.schema_info.emplace_back(cudf::io::column_name_info{name});
}
options_builder = options_builder.metadata(&metadata);
}
cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource());
}
std::string df_to_json(const TableInfo& tbl, bool include_index_col)
{
std::string results;
// no cpp impl for to_json, instead python module converts to pandas and calls to_json
{
py::gil_scoped_acquire gil;
py::object StringIO = py::module_::import("io").attr("StringIO");
auto df = tbl.as_py_object();
auto buffer = StringIO();
py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col);
df.attr("to_json")(buffer, **kwargs);
buffer.attr("seek")(0);
py::object pyresults = buffer.attr("getvalue")();
results = pyresults.cast<std::string>();
}
return results;
}
void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col)
{
// Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++
// implementation of to_json
std::string output = df_to_json(tbl, include_index_col);
// Now write the contents to the stream
out_stream.write(output.data(), output.size());
out_stream.flush();
}
} // namespace morpheus