(Latest Version)

Program Listing for File filter_detection.cpp

Return to documentation for file (morpheus/_lib/src/stages/filter_detection.cpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "morpheus/stages/filter_detection.hpp" // IWYU pragma: accosiated #include "mrc/node/rx_sink_base.hpp" #include "mrc/node/rx_source_base.hpp" #include "mrc/node/sink_properties.hpp" #include "mrc/node/source_properties.hpp" #include "mrc/segment/builder.hpp" #include "mrc/segment/object.hpp" #include "mrc/types.hpp" #include "pymrc/node.hpp" #include "morpheus/messages/multi_tensor.hpp" #include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo #include "morpheus/objects/dtype.hpp" // for DataType #include "morpheus/objects/memory_descriptor.hpp" #include "morpheus/objects/table_info.hpp" #include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject #include "morpheus/types.hpp" // for RangeType #include "morpheus/utilities/matx_util.hpp" #include "morpheus/utilities/tensor_util.hpp" // for TensorUtils::get_element_stride #include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpyDeviceToDevice, cudaMemcpyDeviceToHost #include <cudf/column/column_view.hpp> #include <cudf/types.hpp> #include <glog/logging.h> // for CHECK, CHECK_NE #include <mrc/cuda/common.hpp> // for MRC_CHECK_CUDA #include <rmm/device_buffer.hpp> // for device_buffer #include <cstddef> #include <cstdint> // for uint8_t #include <exception> #include <functional> #include <memory> #include <ostream> // needed for glog #include <string> #include <utility> // for pair // IWYU thinks we need ext/new_allocator.h for size_t for some reason // IWYU pragma: no_include namespace morpheus { // Component public implementations // ************ FilterDetectionStage **************************** // FilterDetectionsStage::FilterDetectionsStage(float threshold, bool copy, FilterSource filter_source, std::string field_name) : PythonNode(base_t::op_factory_from_sub_fn(build_operator())), m_threshold(threshold), m_copy(copy), m_filter_source(filter_source), m_field_name(std::move(field_name)) { CHECK(m_filter_source != FilterSource::Auto); // The python stage should determine this } DevMemInfo FilterDetectionsStage::get_tensor_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x) { // The pipeline build will check to ensure that our input is a MultiResponseMessage const auto& filter_source = std::static_pointer_cast<morpheus::MultiTensorMessage>(x)->get_tensor(m_field_name); CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2) << "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional " "arrays"; // Depending on the input the stride is given in bytes or elements, convert to elements auto stride = morpheus::TensorUtils::get_element_stride(filter_source.get_stride()); return {filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride}; } DevMemInfo FilterDetectionsStage::get_column_filter_source(const std::shared_ptr<morpheus::MultiMessage>& x) { auto table_info = x->get_meta(m_field_name); // since we only asked for one column, we know its the first const auto& col = table_info.get_column(0); auto dtype = morpheus::DType::from_cudf(col.type().id()); auto num_rows = col.size(); auto data = const_cast<uint8_t*>(static_cast<const uint8_t*>(col.head<uint8_t>() + col.offset() * dtype.item_size())); return { data, std::move(dtype), std::make_shared<MemoryDescriptor>(), {num_rows, 1}, {1, 0}, }; } FilterDetectionsStage::subscribe_fn_t FilterDetectionsStage::build_operator() { return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) { std::function<DevMemInfo(const std::shared_ptr<morpheus::MultiMessage>& x)> get_filter_source; if (m_filter_source == FilterSource::TENSOR) { get_filter_source = [this](auto x) { return get_tensor_filter_source(x); }; } else { get_filter_source = [this](auto x) { return get_column_filter_source(x); }; } return input.subscribe(rxcpp::make_observer<sink_type_t>( [this, &output, &get_filter_source](sink_type_t x) { auto tmp_buffer = get_filter_source(x); const auto num_rows = tmp_buffer.shape(0); const auto num_columns = tmp_buffer.shape(1); bool by_row = (num_columns > 1); // Now call the threshold function auto thresh_bool_buffer = MatxUtil::threshold(tmp_buffer, m_threshold, by_row); std::vector<uint8_t> host_bool_values(num_rows); // Copy bools back to host MRC_CHECK_CUDA(cudaMemcpy(host_bool_values.data(), thresh_bool_buffer->data(), thresh_bool_buffer->size(), cudaMemcpyDeviceToHost)); // Only used when m_copy is true std::vector<RangeType> selected_ranges; std::size_t num_selected_rows = 0; // We are slicing by rows, using num_rows as our marker for undefined std::size_t slice_start = num_rows; for (std::size_t row = 0; row < num_rows; ++row) { bool above_threshold = host_bool_values[row]; if (above_threshold && slice_start == num_rows) { slice_start = row; } else if (!above_threshold && slice_start != num_rows) { if (m_copy) { selected_ranges.emplace_back(std::pair{slice_start, row}); num_selected_rows += (row - slice_start); } else { output.on_next(x->get_slice(slice_start, row)); } slice_start = num_rows; } } if (slice_start != num_rows) { // Last row was above the threshold if (m_copy) { selected_ranges.emplace_back(std::pair{slice_start, num_rows}); num_selected_rows += (num_rows - slice_start); } else { output.on_next(x->get_slice(slice_start, num_rows)); } } // num_selected_rows will always be 0 when m_copy is false, // or when m_copy is true, but none of the rows matched the output if (num_selected_rows > 0) { DCHECK(m_copy); output.on_next(x->copy_ranges(selected_ranges, num_selected_rows)); } }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, [&]() { output.on_completed(); })); }; } // ************ FilterDetectionStageInterfaceProxy ************* // std::shared_ptr<mrc::segment::Object<FilterDetectionsStage>> FilterDetectionStageInterfaceProxy::init( mrc::segment::Builder& builder, const std::string& name, float threshold, bool copy, FilterSource filter_source, std::string field_name) { auto stage = builder.construct_object<FilterDetectionsStage>(name, threshold, copy, filter_source, field_name); return stage; } } // namespace morpheus

© Copyright 2023, NVIDIA. Last updated on Apr 11, 2023.