Program Listing for File serializers.cpp

Return to documentation for file (morpheus/_lib/src/io/serializers.cpp)

Copy
Copied!
            

#include "morpheus/io/serializers.hpp" #include <cudf/io/csv.hpp> #include <cudf/io/data_sink.hpp> #include <cudf/io/types.hpp>// for column_name_info, sink_info, table_metadata #include <cudf/table/table_view.hpp> #include <cudf/types.hpp> #include <pybind11/cast.h> #include <pybind11/gil.h> #include <pybind11/pybind11.h> #include <pybind11/pytypes.h> #include <rmm/mr/device/per_device_resource.hpp> #include <array>// for array #include <cstddef>// for size_t #include <numeric> #include <ostream> #include <sstream>// IWYU pragma: keep #include <vector> // IWYU pragma: no_include <unordered_map> namespace morpheus { namespace py = pybind11; using namespace py::literals; using namespace std::string_literals; class OStreamSink : public cudf::io::data_sink { public: OStreamSink(std::ostream& stream) : m_stream(stream) {} void host_write(void const* data, size_t size) override { m_stream.write(static_cast<char const*>(data), size); m_bytest_written += size; } void flush() override { m_stream.flush(); } size_t bytes_written() override { return m_bytest_written; } private: std::ostream& m_stream; size_t m_bytest_written{0}; }; std::string df_to_csv(const TableInfo& tbl, bool include_header, bool include_index_col) { // Create an ostringstream and use that with the overload accepting an ostream std::ostringstream out_stream; df_to_csv(tbl, out_stream, include_header, include_index_col); return out_stream.str(); } void df_to_csv(const TableInfo& tbl, std::ostream& out_stream, bool include_header, bool include_index_col) { auto column_names = tbl.get_column_names(); cudf::size_type start_col = 1; if (include_index_col) { start_col = 0; column_names.insert(column_names.begin(), ""s); // insert the id column } std::vector<cudf::size_type> col_idexes(column_names.size()); std::iota(col_idexes.begin(), col_idexes.end(), start_col); auto tbl_view = tbl.get_view().select(col_idexes); OStreamSink sink(out_stream); auto destination = cudf::io::sink_info(&sink); auto options_builder = cudf::io::csv_writer_options_builder(destination, tbl_view) .include_header(include_header) .true_value("True"s) .false_value("False"s); cudf::io::table_metadata metadata{}; if (include_header) { metadata.column_names = column_names; // After cuDF PR #11364, use schema_info instead of column_names (actually just set both) metadata.schema_info = std::vector<cudf::io::column_name_info>(); for (auto& name : column_names) { metadata.schema_info.emplace_back(cudf::io::column_name_info{name}); } options_builder = options_builder.metadata(&metadata); } cudf::io::write_csv(options_builder.build(), rmm::mr::get_current_device_resource()); } std::string df_to_json(const TableInfo& tbl, bool include_index_col) { std::string results; // no cpp impl for to_json, instead python module converts to pandas and calls to_json { py::gil_scoped_acquire gil; py::object StringIO = py::module_::import("io").attr("StringIO"); auto df = tbl.as_py_object(); auto buffer = StringIO(); py::dict kwargs = py::dict("orient"_a = "records", "lines"_a = true, "index"_a = include_index_col); df.attr("to_json")(buffer, **kwargs); buffer.attr("seek")(0); py::object pyresults = buffer.attr("getvalue")(); results = pyresults.cast<std::string>(); } return results; } void df_to_json(const TableInfo& tbl, std::ostream& out_stream, bool include_index_col) { // Unlike df_to_csv, we use the ostream overload to call the string overload because there is no C++ // implementation of to_json std::string output = df_to_json(tbl, include_index_col); // Now write the contents to the stream out_stream.write(output.data(), output.size()); out_stream.flush(); } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Feb 3, 2023.