↰ Return to documentation for file (morpheus/_lib/src/messages/multi.cpp
)
#include "morpheus/messages/multi.hpp"
#include "morpheus/messages/meta.hpp"
#include "morpheus/objects/table_info.hpp"
#include "morpheus/objects/tensor_object.hpp"
#include "morpheus/utilities/type_util.hpp"
#include "morpheus/utilities/type_util_detail.hpp"// for TypeId, DataType
#include <cuda_runtime.h>// for cudaMemcpy, cudaMemcpy2D, cudaMemcpyDeviceToDevice
#include <cudf/column/column_view.hpp>// for column_view
#include <cudf/concatenate.hpp>
#include <cudf/copying.hpp>
#include <cudf/io/types.hpp>
#include <cudf/types.hpp>
#include <mrc/cuda/common.hpp>// for MRC_CHECK_CUDA
#include <pybind11/cast.h>
#include <pybind11/pybind11.h>
#include <rmm/mr/device/per_device_resource.hpp>// for get_current_device_resource
#include <algorithm>// for transform
#include <array>// needed for pybind11::make_tuple
#include <cstdint>// for uint8_t
#include <memory>
#include <string>
#include <utility>
#include <vector>
// IWYU pragma: no_include <unordered_map>
namespace morpheus {
/****** Component public implementations *******************/
/****** MultiMessage****************************************/
MultiMessage::MultiMessage(std::shared_ptr<morpheus::MessageMeta> m, size_t o, size_t c) :
meta(std::move(m)),
mess_offset(o),
mess_count(c)
{}
TableInfo MultiMessage::get_meta()
{
auto table_info = this->get_meta(std::vector<std::string>{});
return table_info;
}
TableInfo MultiMessage::get_meta(const std::string &col_name)
{
auto table_view = this->get_meta(std::vector<std::string>{col_name});
return table_view;
}
TableInfo MultiMessage::get_meta(const std::vector<std::string> &column_names)
{
TableInfo info = this->meta->get_info();
TableInfo sliced_info = info.get_slice(this->mess_offset,
this->mess_offset + this->mess_count,
column_names.empty() ? info.get_column_names() : column_names);
return sliced_info;
}
void MultiMessage::get_slice_impl(std::shared_ptr<MultiMessage> new_message, std::size_t start, std::size_t stop) const
{
new_message->mess_offset = this->mess_offset + start;
new_message->mess_count = this->mess_offset + stop - new_message->mess_offset;
}
void MultiMessage::copy_ranges_impl(std::shared_ptr<MultiMessage> new_message,
const std::vector<std::pair<size_t, size_t>> &ranges,
size_t num_selected_rows) const
{
new_message->mess_offset = 0;
new_message->mess_count = num_selected_rows;
new_message->meta = copy_meta_ranges(ranges);
}
std::shared_ptr<MessageMeta> MultiMessage::copy_meta_ranges(const std::vector<std::pair<size_t, size_t>> &ranges) const
{
// copy ranges into a sequntial list of values
// https://github.com/rapidsai/cudf/issues/11223
std::vector<cudf::size_type> cudf_ranges;
for (const auto &p : ranges)
{
// Append the message offset to the range here
cudf_ranges.push_back(static_cast<cudf::size_type>(p.first + this->mess_offset));
cudf_ranges.push_back(static_cast<cudf::size_type>(p.second + this->mess_offset));
}
auto table_info = this->meta->get_info();
std::vector<std::string> column_names = table_info.get_column_names();
column_names.insert(column_names.begin(), std::string()); // cudf id col
cudf::io::table_metadata metadata{std::move(column_names)};
auto table_view = table_info.get_view();
auto sliced_views = cudf::slice(table_view, cudf_ranges);
cudf::io::table_with_metadata table = {cudf::concatenate(sliced_views, rmm::mr::get_current_device_resource()),
std::move(metadata)};
return MessageMeta::create_from_cpp(std::move(table), 1);
}
void MultiMessage::set_meta(const std::string &col_name, TensorObject tensor)
{
set_meta(std::vector<std::string>{col_name}, std::vector<TensorObject>{tensor});
}
void MultiMessage::set_meta(const std::vector<std::string> &column_names, const std::vector<TensorObject> &tensors)
{
std::vector<TypeId> tensor_types{tensors.size()};
for (size_t i = 0; i < tensors.size(); ++i)
{
tensor_types[i] = tensors[i].dtype().type_id();
}
TableInfo info = this->meta->get_info();
info.insert_missing_columns(column_names, tensor_types);
TableInfo table_meta = this->get_meta(column_names);
for (size_t i = 0; i < tensors.size(); ++i)
{
const auto cv = table_meta.get_column(i);
const auto table_type = cv.type().id();
const auto tensor_type = DType(tensor_types[i]).cudf_type_id();
const auto row_stride = tensors[i].stride(0);
CHECK(tensors[i].count() == cv.size() && (table_type == tensor_type || (table_type == cudf::type_id::BOOL8 &&
tensor_type == cudf::type_id::UINT8)));
const auto item_size = tensors[i].dtype().item_size();
// Dont use cv.data<>() here since that does not account for the size of each element
auto data_start = const_cast<uint8_t *>(cv.head<uint8_t>()) + cv.offset() * item_size;
if (row_stride == 1)
{
// column major just use cudaMemcpy
MRC_CHECK_CUDA(cudaMemcpy(data_start, tensors[i].data(), tensors[i].bytes(), cudaMemcpyDeviceToDevice));
}
else
{
MRC_CHECK_CUDA(cudaMemcpy2D(data_start,
item_size,
tensors[i].data(),
row_stride * item_size,
item_size,
cv.size(),
cudaMemcpyDeviceToDevice));
}
}
}
std::vector<std::pair<TensorIndex, TensorIndex>> MultiMessage::apply_offset_to_ranges(
std::size_t offset, const std::vector<std::pair<size_t, size_t>> &ranges) const
{
std::vector<std::pair<TensorIndex, TensorIndex>> offset_ranges(ranges.size());
std::transform(
ranges.cbegin(), ranges.cend(), offset_ranges.begin(), [offset](const std::pair<size_t, size_t> range) {
return std::pair{offset + range.first, offset + range.second};
});
return offset_ranges;
}
/****** MultiMessageInterfaceProxy *************************/
std::shared_ptr<MultiMessage> MultiMessageInterfaceProxy::init(std::shared_ptr<MessageMeta> meta,
cudf::size_type mess_offset,
cudf::size_type mess_count)
{
return std::make_shared<MultiMessage>(std::move(meta), mess_offset, mess_count);
}
std::shared_ptr<morpheus::MessageMeta> MultiMessageInterfaceProxy::meta(const MultiMessage &self)
{
return self.meta;
}
std::size_t MultiMessageInterfaceProxy::mess_offset(const MultiMessage &self)
{
return self.mess_offset;
}
std::size_t MultiMessageInterfaceProxy::mess_count(const MultiMessage &self)
{
return self.mess_count;
}
pybind11::object MultiMessageInterfaceProxy::get_meta(MultiMessage &self)
{
// Mimic this python code
// self.meta.df.loc[self.meta.df.index[self.mess_offset:self.mess_offset + self.mess_count], columns] =
// value
auto df = self.meta->get_py_table();
auto index_slice = pybind11::slice(
pybind11::int_(self.mess_offset), pybind11::int_(self.mess_offset + self.mess_count), pybind11::none());
// Must do implicit conversion to pybind11::object here!!!
pybind11::object df_slice = df.attr("loc")[df.attr("index")[index_slice]];
return df_slice;
}
pybind11::object MultiMessageInterfaceProxy::get_meta(MultiMessage &self, std::string col_name)
{
// Get the column and convert to cudf
auto info = self.get_meta(col_name);
return info.as_py_object();
}
pybind11::object MultiMessageInterfaceProxy::get_meta(MultiMessage &self, std::vector<std::string> columns)
{
// Get the column and convert to cudf
auto info = self.get_meta(columns);
return info.as_py_object();
}
pybind11::object MultiMessageInterfaceProxy::get_meta_by_col(MultiMessage &self, pybind11::object columns)
{
// // Get the column and convert to cudf
// auto info = self.get_meta(columns);
// auto py_table_struct = make_table_from_table_info(info, (PyObject*)info.get_parent_table().ptr());
// if (!py_table_struct)
// {
// throw pybind11::error_already_set();
// }
// pybind11::object py_table = pybind11::reinterpret_steal<pybind11::object>((PyObject*)py_table_struct);
// return py_table;
// Mimic this python code
// self.meta.df.loc[self.meta.df.index[self.mess_offset:self.mess_offset + self.mess_count], columns] =
// value
auto df = self.meta->get_py_table();
auto index_slice = pybind11::slice(
pybind11::int_(self.mess_offset), pybind11::int_(self.mess_offset + self.mess_count), pybind11::none());
// Must do implicit conversion to pybind11::object here!!!
pybind11::object df_slice = df.attr("loc")[pybind11::make_tuple(df.attr("index")[index_slice], columns)];
return df_slice;
}
pybind11::object MultiMessageInterfaceProxy::get_meta_list(MultiMessage &self, pybind11::object col_name)
{
std::vector<std::string> column_names;
if (!col_name.is_none())
{
column_names.emplace_back(col_name.cast<std::string>());
}
auto info = self.get_meta(column_names);
auto meta = info.as_py_object();
if (!col_name.is_none())
{ // needed to slice off the id column
meta = meta[col_name];
}
auto arrow_tbl = meta.attr("to_arrow")();
pybind11::object py_list = arrow_tbl.attr("to_pylist")();
return py_list;
}
void MultiMessageInterfaceProxy::set_meta(MultiMessage &self, pybind11::object columns, pybind11::object value)
{
// Mimic this python code
// self.meta.df.loc[self.meta.df.index[self.mess_offset:self.mess_offset + self.mess_count], columns] =
// value
auto df = self.meta->get_py_table();
auto index_slice = pybind11::slice(
pybind11::int_(self.mess_offset), pybind11::int_(self.mess_offset + self.mess_count), pybind11::none());
df.attr("loc")[pybind11::make_tuple(df.attr("index")[index_slice], columns)] = value;
}
std::shared_ptr<MultiMessage> MultiMessageInterfaceProxy::get_slice(MultiMessage &self,
std::size_t start,
std::size_t stop)
{
// Returns shared_ptr
return self.get_slice(start, stop);
}
std::shared_ptr<MultiMessage> MultiMessageInterfaceProxy::copy_ranges(
MultiMessage &self, const std::vector<std::pair<size_t, size_t>> &ranges, pybind11::object num_selected_rows)
{
std::size_t num_rows = 0;
if (num_selected_rows.is_none())
{
for (const auto &range : ranges)
{
num_rows += range.second - range.first;
}
}
else
{
num_rows = num_selected_rows.cast<std::size_t>();
}
return self.copy_ranges(ranges, num_rows);
}
} // namespace morpheus