Program Listing for File filter_detection.cpp

Return to documentation for file (morpheus/_lib/src/stages/filter_detection.cpp)

Copy
Copied!
            

#include "morpheus/stages/filter_detection.hpp"// IWYU pragma: accosiated #include "morpheus/messages/multi_tensor.hpp" #include "morpheus/objects/tensor_object.hpp"// for TensorIndex, TensorObject #include "morpheus/utilities/matx_util.hpp" #include "morpheus/utilities/tensor_util.hpp"// for TensorUtils::get_element_stride #include "morpheus/utilities/type_util.hpp" #include "morpheus/utilities/type_util_detail.hpp"// for DataType #include <cuda_runtime.h>// for cudaMemcpy, cudaMemcpyDeviceToDevice, cudaMemcpyDeviceToHost #include <glog/logging.h>// for CHECK, CHECK_NE #include <mrc/cuda/common.hpp>// for MRC_CHECK_CUDA #include <rmm/cuda_stream_view.hpp>// for cuda_stream_per_thread #include <rmm/device_buffer.hpp>// for device_buffer #include <cstddef> #include <cstdint>// for uint8_t #include <exception> #include <memory> #include <ostream>// needed for glog #include <string> #include <type_traits>// for declval (indirectly via templates) #include <utility>// for pair // IWYU thinks we need ext/new_allocator.h for size_t for some reason // IWYU pragma: no_include <ext/new_allocator.h> namespace morpheus { // Component public implementations // ************ FilterDetectionStage **************************** // FilterDetectionsStage::FilterDetectionsStage(float threshold, bool copy, FilterSource filter_source, std::string field_name) : PythonNode(base_t::op_factory_from_sub_fn(build_operator())), m_threshold(threshold), m_copy(copy), m_filter_source(filter_source), m_field_name(std::move(field_name)) { CHECK(m_filter_source != FilterSource::Auto); // The python stage should determine this } DevMemInfo FilterDetectionsStage::get_tensor_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x) { // The pipeline build will check to ensure that our input is a MultiResponseMessage const auto& filter_source = std::static_pointer_cast<morpheus::MultiTensorMessage>(x)->get_tensor(m_field_name); CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2) << "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional " "arrays"; auto buffer = std::make_shared<rmm::device_buffer>(filter_source.bytes(), rmm::cuda_stream_per_thread); MRC_CHECK_CUDA(cudaMemcpy( buffer->data(), static_cast<const uint8_t*>(filter_source.data()), buffer->size(), cudaMemcpyDeviceToDevice)); // Depending on the input the stride is given in bytes or elements, convert to elements auto stride = morpheus::TensorUtils::get_element_stride<std::size_t, std::size_t>(filter_source.get_stride()); return {buffer, filter_source.dtype(), filter_source.get_shape(), stride}; } DevMemInfo FilterDetectionsStage::get_column_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x) { auto table_info = x->get_meta(m_field_name); // since we only asked for one column, we know its the first const auto& col = table_info.get_column(0); auto dtype = morpheus::DType::from_cudf(col.type().id()); auto num_rows = static_cast<std::size_t>(col.size()); auto buffer = std::make_shared<rmm::device_buffer>(num_rows * dtype.item_size(), rmm::cuda_stream_per_thread); MRC_CHECK_CUDA(cudaMemcpy(buffer->data(), static_cast<const uint8_t*>(col.head<uint8_t>() + col.offset() * dtype.item_size()), buffer->size(), cudaMemcpyDeviceToDevice)); return { buffer, std::move(dtype), {num_rows, 1}, {1, 0}, }; } FilterDetectionsStage::subscribe_fn_t FilterDetectionsStage::build_operator() { return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) { std::function<DevMemInfo(const std::shared_ptr<morpheus::MultiMessage>& x)> get_filter_source; if (m_filter_source == FilterSource::TENSOR) { get_filter_source = [this](auto x) { return get_tensor_filter_source(x); }; } else { get_filter_source = [this](auto x) { return get_column_filter_source(x); }; } return input.subscribe(rxcpp::make_observer<sink_type_t>( [this, &output, &get_filter_source](sink_type_t x) { auto tmp_buffer = get_filter_source(x); const auto num_rows = tmp_buffer.shape(0); const auto num_columns = tmp_buffer.shape(1); bool by_row = (num_columns > 1); // Now call the threshold function auto thresh_bool_buffer = MatxUtil::threshold(tmp_buffer, m_threshold, by_row); std::vector<uint8_t> host_bool_values(num_rows); // Copy bools back to host MRC_CHECK_CUDA(cudaMemcpy(host_bool_values.data(), thresh_bool_buffer->data(), thresh_bool_buffer->size(), cudaMemcpyDeviceToHost)); // Only used when m_copy is true std::vector<std::pair<std::size_t, std::size_t>> selected_ranges; std::size_t num_selected_rows = 0; // We are slicing by rows, using num_rows as our marker for undefined std::size_t slice_start = num_rows; for (std::size_t row = 0; row < num_rows; ++row) { bool above_threshold = host_bool_values[row]; if (above_threshold && slice_start == num_rows) { slice_start = row; } else if (!above_threshold && slice_start != num_rows) { if (m_copy) { selected_ranges.emplace_back(std::pair{slice_start, row}); num_selected_rows += (row - slice_start); } else { output.on_next(x->get_slice(slice_start, row)); } slice_start = num_rows; } } if (slice_start != num_rows) { // Last row was above the threshold if (m_copy) { selected_ranges.emplace_back(std::pair{slice_start, num_rows}); num_selected_rows += (num_rows - slice_start); } else { output.on_next(x->get_slice(slice_start, num_rows)); } } // num_selected_rows will always be 0 when m_copy is false, // or when m_copy is true, but none of the rows matched the output if (num_selected_rows > 0) { DCHECK(m_copy); output.on_next(x->copy_ranges(selected_ranges, num_selected_rows)); } }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { output.on_completed(); })); }; } // ************ FilterDetectionStageInterfaceProxy ************* // std::shared_ptr<mrc::segment::Object<FilterDetectionsStage>> FilterDetectionStageInterfaceProxy::init( mrc::segment::Builder& builder, const std::string& name, float threshold, bool copy, FilterSource filter_source, std::string field_name) { auto stage = builder.construct_object<FilterDetectionsStage>(name, threshold, copy, filter_source, field_name); return stage; } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Feb 3, 2023.