Program Listing for File meta.cpp

Return to documentation for file (morpheus/_lib/src/messages/meta.cpp)


/* * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "morpheus/messages/meta.hpp" #include "morpheus/io/deserializers.hpp" #include "morpheus/objects/mutable_table_ctx_mgr.hpp" #include "morpheus/objects/python_data_table.hpp" #include "morpheus/objects/table_info.hpp" #include "morpheus/utilities/cudf_util.hpp" #include <cudf/io/types.hpp> #include <glog/logging.h> #include <pybind11/gil.h> #include <pybind11/pybind11.h> #include <pybind11/pytypes.h> #include <pyerrors.h> // for PyExc_DeprecationWarning #include <warnings.h> // for PyErr_WarnEx #include <memory> #include <optional> #include <ostream> // for operator<< needed by glog #include <stdexcept> // for runtime_error #include <utility> // We're already including pybind11.h and don't need to include cast. // For some reason IWYU also thinks we need array for the `isinsance` call. // IWYU pragma: no_include <pybind11/cast.h> // IWYU pragma: no_include <array> namespace morpheus { namespace py = pybind11; /****** Component public implementations *******************/ /****** MessageMeta ****************************************/ TensorIndex MessageMeta::count() const { return m_data->count(); } TableInfo MessageMeta::get_info() const { return this->m_data->get_info(); } MutableTableInfo MessageMeta::get_mutable_info() const { return this->m_data->get_mutable_info(); } std::shared_ptr<MessageMeta> MessageMeta::create_from_python(py::object&& data_table) { auto data = std::make_unique<PyDataTable>(std::move(data_table)); return std::shared_ptr<MessageMeta>(new MessageMeta(std::move(data))); } std::shared_ptr<MessageMeta> MessageMeta::create_from_cpp(cudf::io::table_with_metadata&& data_table, int index_col_count) { // Convert to py first py::object py_dt = cpp_to_py(std::move(data_table), index_col_count); auto data = std::make_unique<PyDataTable>(std::move(py_dt)); return std::shared_ptr<MessageMeta>(new MessageMeta(std::move(data))); } MessageMeta::MessageMeta(std::shared_ptr<IDataTable> data) : m_data(std::move(data)) {} py::object MessageMeta::cpp_to_py(cudf::io::table_with_metadata&& table, int index_col_count) { py::gil_scoped_acquire gil; // Now convert to a python TableInfo object auto converted_table = CudfHelper::table_from_table_with_metadata(std::move(table), index_col_count); // VLOG(10) << "Table. Num Col: " << converted_table.attr("_num_columns").str().cast<std::string>() // << ", Num Ind: " << converted_table.attr("_num_columns").cast<std::string>() // << ", Rows: " << converted_table.attr("_num_rows").cast<std::string>(); // py::print("Table Created. Num Rows: {}, Num Cols: {}, Num Ind: {}", // converted_table.attr("_num_rows"), // converted_table.attr("_num_columns"), // converted_table.attr("_num_indices")); return converted_table; } bool MessageMeta::has_sliceable_index() const { const auto table = get_info(); return table.has_sliceable_index(); } std::optional<std::string> MessageMeta::ensure_sliceable_index() { auto table = this->get_mutable_info(); // Check to ensure we do (or still do) have a non-unique index. Presumably the caller already made a call to // `has_sliceable_index` but there could have been a race condition between the first call to has_sliceable_index // and the acquisition of the mutex. Re-check here to ensure some other thread didn't already fix the index if (!table.has_sliceable_index()) { LOG(WARNING) << "Non unique index found in dataframe, generating new index."; return table.ensure_sliceable_index(); } return std::nullopt; } /********** MessageMetaInterfaceProxy **********/ std::shared_ptr<MessageMeta> MessageMetaInterfaceProxy::init_python(py::object&& data_frame) { // ensure we have a cudf DF and not a pandas DF auto cudf_df_cls = py::module_::import("cudf").attr("DataFrame"); if (!py::isinstance(data_frame, cudf_df_cls)) { // Convert to cudf if it's a Pandas DF, thrown an error otherwise auto pd_df_cls = py::module_::import("pandas").attr("DataFrame"); if (py::isinstance(data_frame, pd_df_cls)) { LOG(WARNING) << "Dataframe is not a cudf dataframe, converting to cudf dataframe"; data_frame = cudf_df_cls(std::move(data_frame)); } else { throw pybind11::value_error("Dataframe is not a cudf or pandas dataframe"); } } return MessageMeta::create_from_python(std::move(data_frame)); } TensorIndex MessageMetaInterfaceProxy::count(MessageMeta& self) { return self.count(); } py::object MessageMetaInterfaceProxy::get_data_frame(MessageMeta& self) { TableInfo info; { // Need to release the GIL before calling `get_meta()` pybind11::gil_scoped_release no_gil; // Get the column and convert to cudf info = self.get_info(); } return CudfHelper::table_from_table_info(info); } py::object MessageMetaInterfaceProxy::df_property(MessageMeta& self) { PyErr_WarnEx( PyExc_DeprecationWarning, "Warning the df property returns a copy, please use the copy_dataframe method or the mutable_dataframe " "context manager to modify the DataFrame in-place instead.", 1); return MessageMetaInterfaceProxy::get_data_frame(self); } MutableTableCtxMgr MessageMetaInterfaceProxy::mutable_dataframe(MessageMeta& self) { // Release any GIL py::gil_scoped_release no_gil; return {self}; } std::shared_ptr<MessageMeta> MessageMetaInterfaceProxy::init_cpp(const std::string& filename) { // Load the file auto df_with_meta = load_table_from_file(filename); int index_col_count = prepare_df_index(df_with_meta); return MessageMeta::create_from_cpp(std::move(df_with_meta), index_col_count); } bool MessageMetaInterfaceProxy::has_sliceable_index(MessageMeta& self) { // Release the GIL py::gil_scoped_release no_gil; return self.has_sliceable_index(); } std::optional<std::string> MessageMetaInterfaceProxy::ensure_sliceable_index(MessageMeta& self) { // Release the GIL py::gil_scoped_release no_gil; return self.ensure_sliceable_index(); } SlicedMessageMeta::SlicedMessageMeta(std::shared_ptr<MessageMeta> other, TensorIndex start, TensorIndex stop, std::vector<std::string> columns) : MessageMeta(*other), m_start(start), m_stop(stop), m_column_names(std::move(columns)) {} TensorIndex SlicedMessageMeta::count() const { return m_stop - m_start; } TableInfo SlicedMessageMeta::get_info() const { return this->m_data->get_info().get_slice(m_start, m_stop, m_column_names); } MutableTableInfo SlicedMessageMeta::get_mutable_info() const { return this->m_data->get_mutable_info().get_slice(m_start, m_stop, m_column_names); } std::optional<std::string> SlicedMessageMeta::ensure_sliceable_index() { throw std::runtime_error{"Unable to set a new index on the DataFrame from a partial view of the columns/rows."}; } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Oct 12, 2023.