Program Listing for File filter_detections.cpp
↰ Return to documentation for file (morpheus/_lib/src/stages/filter_detections.cpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "morpheus/stages/filter_detections.hpp"
#include "mrc/segment/builder.hpp" // for Builder
#include "mrc/segment/object.hpp" // for Object
#include "morpheus/messages/control.hpp" // for ControlMessage
#include "morpheus/messages/multi.hpp" // for MultiMessage
#include "morpheus/messages/multi_tensor.hpp" // for MultiTensorMessage
#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo
#include "morpheus/objects/dtype.hpp" // for DType
#include "morpheus/objects/memory_descriptor.hpp" // for MemoryDescriptor
#include "morpheus/objects/table_info.hpp" // for TableInfo
#include "morpheus/types.hpp" // for RangeType
#include "morpheus/utilities/matx_util.hpp" // for MatxUtil
#include "morpheus/utilities/tensor_util.hpp" // for TensorUtils
#include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpyKind
#include <cudf/column/column_view.hpp> // for column_view
#include <cudf/types.hpp> // for data_type
#include <glog/logging.h> // for COMPACT_GOOGLE_LOG_FATAL, LogMessageFatal, CHECK, DCHECK
#include <mrc/cuda/common.hpp> // for MRC_CHECK_CUDA
#include <rmm/cuda_stream_view.hpp> // for cuda_stream_per_thread
#include <rmm/mr/device/per_device_resource.hpp> // for get_current_device_resource
#include <cstddef> // for size_t
#include <cstdint> // for uint8_t
#include <exception> // for exception_ptr
#include <functional> // for function
#include <memory> // for make_shared, shared_ptr, __shared_ptr_access
#include <ostream> // for operator<<, basic_ostream
#include <string> // for char_traits, string
#include <utility> // for move, pair
#include <vector> // for vector
namespace morpheus {
// Component public implementations
// ************ FilterDetectionStage **************************** //
template <typename MessageT>
FilterDetectionsStage<MessageT>::FilterDetectionsStage(float threshold,
bool copy,
FilterSource filter_source,
std::string field_name) :
base_t(base_t::op_factory_from_sub_fn(build_operator())),
m_threshold(threshold),
m_copy(copy),
m_filter_source(filter_source),
m_field_name(std::move(field_name))
{
CHECK(m_filter_source != FilterSource::Auto); // The python stage should determine this
}
template <typename MessageT>
DevMemInfo FilterDetectionsStage<MessageT>::get_tensor_filter_source(const sink_type_t& x)
{
if constexpr (std::is_same_v<MessageT, MultiMessage>)
{
// The pipeline build will check to ensure that our input is a MultiResponseMessage
const auto& filter_source = std::static_pointer_cast<MultiTensorMessage>(x)->get_tensor(m_field_name);
CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2)
<< "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional "
"arrays";
// Depending on the input the stride is given in bytes or elements, convert to elements
auto stride = TensorUtils::get_element_stride(filter_source.get_stride());
return {
filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride};
}
else if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
const auto& filter_source = x->tensors()->get_tensor(m_field_name);
CHECK(filter_source.rank() > 0 && filter_source.rank() <= 2)
<< "C++ impl of the FilterDetectionsStage currently only supports one and two dimensional "
"arrays";
// Depending on the input the stride is given in bytes or elements, convert to elements
auto stride = TensorUtils::get_element_stride(filter_source.get_stride());
return {
filter_source.data(), filter_source.dtype(), filter_source.get_memory(), filter_source.get_shape(), stride};
}
else
{
// sink_type_t not supported
static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type");
}
}
template <typename MessageT>
DevMemInfo FilterDetectionsStage<MessageT>::get_column_filter_source(const sink_type_t& x)
{
TableInfo table_info;
if constexpr (std::is_same_v<MessageT, MultiMessage>)
{
table_info = x->get_meta(m_field_name);
}
else if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
table_info = x->payload()->get_info(m_field_name);
}
else
{
// sink_type_t not supported
static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type");
}
// since we only asked for one column, we know its the first
const auto& col = table_info.get_column(0);
auto dtype = DType::from_cudf(col.type().id());
auto num_rows = col.size();
auto data =
const_cast<uint8_t*>(static_cast<const uint8_t*>(col.head<uint8_t>() + col.offset() * dtype.item_size()));
return {
data,
std::move(dtype),
std::make_shared<MemoryDescriptor>(rmm::cuda_stream_per_thread, rmm::mr::get_current_device_resource()),
{num_rows, 1},
{1, 0},
};
}
template <typename MessageT>
FilterDetectionsStage<MessageT>::subscribe_fn_t FilterDetectionsStage<MessageT>::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
std::function<DevMemInfo(const sink_type_t& x)> get_filter_source;
if (m_filter_source == FilterSource::TENSOR)
{
get_filter_source = [this](auto x) {
return get_tensor_filter_source(x);
};
}
else
{
get_filter_source = [this](auto x) {
return get_column_filter_source(x);
};
}
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output, &get_filter_source](sink_type_t x) {
auto tmp_buffer = get_filter_source(x);
const auto num_rows = tmp_buffer.shape(0);
const auto num_columns = tmp_buffer.shape(1);
bool by_row = (num_columns > 1);
// Now call the threshold function
auto thresh_bool_buffer = MatxUtil::threshold(tmp_buffer, m_threshold, by_row);
std::vector<uint8_t> host_bool_values(num_rows);
// Copy bools back to host
MRC_CHECK_CUDA(cudaMemcpy(host_bool_values.data(),
thresh_bool_buffer->data(),
thresh_bool_buffer->size(),
cudaMemcpyDeviceToHost));
// Only used when m_copy is true
std::vector<RangeType> selected_ranges;
std::size_t num_selected_rows = 0;
// We are slicing by rows, using num_rows as our marker for undefined
std::size_t slice_start = num_rows;
for (std::size_t row = 0; row < num_rows; ++row)
{
bool above_threshold = host_bool_values[row];
if (above_threshold && slice_start == num_rows)
{
slice_start = row;
}
else if (!above_threshold && slice_start != num_rows)
{
if (m_copy)
{
selected_ranges.emplace_back(std::pair{slice_start, row});
num_selected_rows += (row - slice_start);
}
else
{
if constexpr (std::is_same_v<MessageT, MultiMessage>)
{
output.on_next(x->get_slice(slice_start, row));
}
else if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
auto meta = x->payload();
std::shared_ptr<ControlMessage> sliced_cm = std::make_shared<ControlMessage>(*x);
sliced_cm->payload(meta->get_slice(slice_start, row));
output.on_next(sliced_cm);
}
else
{
// sink_type_t not supported
static_assert(!sizeof(sink_type_t),
"FilterDetectionsStage receives unsupported input type");
}
}
slice_start = num_rows;
}
}
if (slice_start != num_rows)
{
// Last row was above the threshold
if (m_copy)
{
selected_ranges.emplace_back(std::pair{slice_start, num_rows});
num_selected_rows += (num_rows - slice_start);
}
else
{
if constexpr (std::is_same_v<MessageT, MultiMessage>)
{
output.on_next(x->get_slice(slice_start, num_rows));
}
else if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
auto meta = x->payload();
x->payload(meta->get_slice(slice_start, num_rows));
output.on_next(x);
}
else
{
// sink_type_t not supported
static_assert(!sizeof(sink_type_t),
"FilterDetectionsStage receives unsupported input type");
}
}
}
// num_selected_rows will always be 0 when m_copy is false,
// or when m_copy is true, but none of the rows matched the output
if (num_selected_rows > 0)
{
DCHECK(m_copy);
if constexpr (std::is_same_v<MessageT, MultiMessage>)
{
output.on_next(x->copy_ranges(selected_ranges, num_selected_rows));
}
else if constexpr (std::is_same_v<MessageT, ControlMessage>)
{
auto meta = x->payload();
x->payload(meta->copy_ranges(selected_ranges));
output.on_next(x);
}
else
{
// sink_type_t not supported
static_assert(!sizeof(sink_type_t), "FilterDetectionsStage receives unsupported input type");
}
}
},
[&](std::exception_ptr error_ptr) {
output.on_error(error_ptr);
},
[&]() {
output.on_completed();
}));
};
}
// ************ FilterDetectionStageInterfaceProxy ************* //
std::shared_ptr<mrc::segment::Object<FilterDetectionsStageMM>> FilterDetectionStageInterfaceProxy::init_mm(
mrc::segment::Builder& builder,
const std::string& name,
float threshold,
bool copy,
FilterSource filter_source,
std::string field_name)
{
auto stage = builder.construct_object<FilterDetectionsStageMM>(name, threshold, copy, filter_source, field_name);
return stage;
}
std::shared_ptr<mrc::segment::Object<FilterDetectionsStageCM>> FilterDetectionStageInterfaceProxy::init_cm(
mrc::segment::Builder& builder,
const std::string& name,
float threshold,
bool copy,
FilterSource filter_source,
std::string field_name)
{
auto stage = builder.construct_object<FilterDetectionsStageCM>(name, threshold, copy, filter_source, field_name);
return stage;
}
} // namespace morpheus