↰ Return to documentation for file (morpheus/_lib/src/stages/triton_inference.cpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "morpheus/stages/triton_inference.hpp"
#include "mrc/node/rx_sink_base.hpp"
#include "mrc/node/rx_source_base.hpp"
#include "mrc/node/sink_properties.hpp"
#include "mrc/node/source_properties.hpp"
#include "mrc/segment/builder.hpp"
#include "mrc/segment/object.hpp"
#include "mrc/types.hpp"
#include "morpheus/messages/memory/response_memory.hpp"
#include "morpheus/messages/memory/tensor_memory.hpp" // for TensorMemory
#include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo
#include "morpheus/objects/dtype.hpp" // for DType
#include "morpheus/objects/tensor.hpp" // for Tensor::create
#include "morpheus/objects/tensor_object.hpp" // for TensorObject
#include "morpheus/objects/triton_in_out.hpp" // for TritonInOut
#include "morpheus/types.hpp" // for TensorIndex, TensorMap
#include "morpheus/utilities/matx_util.hpp" // for MatxUtil::logits, MatxUtil::reduce_max
#include "morpheus/utilities/stage_util.hpp" // for foreach_map
#include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR
#include "morpheus/utilities/tensor_util.hpp" // for get_elem_count
#include <cuda_runtime.h> // for cudaMemcpy, cudaMemcpy2D, cudaMemcpyDeviceToHost, cudaMemcpyHostToDevice
#include <glog/logging.h>
#include <http_client.h>
#include <mrc/cuda/common.hpp> // for MRC_CHECK_CUDA
#include <nlohmann/json.hpp>
#include <pymrc/node.hpp>
#include <rmm/cuda_stream_view.hpp> // for cuda_stream_per_thread
#include <rmm/device_buffer.hpp> // for device_buffer
#include <algorithm> // for min
#include <cstddef>
#include <cstdint>
#include <exception>
#include <functional>
#include <memory>
#include <sstream>
#include <stdexcept> // for runtime_error, out_of_range
#include <utility>
// IWYU pragma: no_include
#define CHECK_TRITON(method) ::InferenceClientStage__check_triton_errors(method, #method, __FILE__, __LINE__);
namespace {
using namespace morpheus;
using buffer_map_t = std::map<std::string, std::shared_ptr<rmm::device_buffer>>;
// Component-private free functions.
void InferenceClientStage__check_triton_errors(triton::client::Error status,
const std::string& methodName,
const std::string& filename,
const int& lineNumber)
{
if (!status.IsOk())
{
std::string err_msg = MORPHEUS_CONCAT_STR("Triton Error while executing '"
<< methodName << "'. Error: " + status.Message() << "\n"
<< filename << "(" << lineNumber << ")");
LOG(ERROR) << err_msg;
throw std::runtime_error(err_msg);
}
}
void build_output_tensors(TensorIndex count,
const std::vector<TritonInOut>& model_outputs,
buffer_map_t& output_buffers,
TensorMap& output_tensors)
{
// Create the output memory blocks
for (auto& model_output : model_outputs)
{
ShapeType total_shape = model_output.shape;
// First dimension will always end up being the number of rows in the dataframe
total_shape[0] = count;
auto elem_count = TensorUtils::get_elem_count(total_shape);
// Create the output memory
auto output_buffer = std::make_shared<rmm::device_buffer>(elem_count * model_output.datatype.item_size(),
rmm::cuda_stream_per_thread);
output_buffers[model_output.mapped_name] = output_buffer;
// Triton results are always in row-major as required by the KServe protocol
// https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md#tensor-data
ShapeType stride{total_shape[1], 1};
output_tensors[model_output.mapped_name].swap(
Tensor::create(std::move(output_buffer), model_output.datatype, total_shape, stride, 0));
}
}
ShapeType get_seq_ids(const InferenceClientStage::sink_type_t& message)
{
// Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe
// The output tensors we store in `reponse_memory` will all be of the same length as the the
// dataframe. seq_ids has three columns, but we are only interested in the first column.
auto seq_ids = message->get_input("seq_ids");
const auto item_size = seq_ids.dtype().item_size();
ShapeType host_seq_ids(message->count);
MRC_CHECK_CUDA(cudaMemcpy2D(host_seq_ids.data(),
item_size,
seq_ids.data(),
seq_ids.stride(0) * item_size,
item_size,
host_seq_ids.size(),
cudaMemcpyDeviceToHost));
return host_seq_ids;
}
std::pair<std::shared_ptr<triton::client::InferInput>, std::vector<uint8_t>> build_input(
const InferenceClientStage::sink_type_t& msg_slice, const TritonInOut& model_input)
{
DCHECK(msg_slice->memory->has_tensor(model_input.mapped_name))
<< "Model input '" << model_input.mapped_name << "' not found in InferenceMemory";
auto const& inp_tensor = msg_slice->get_input(model_input.mapped_name);
// Convert to the right type. Make shallow if necessary
auto final_tensor = inp_tensor.as_type(model_input.datatype);
std::vector<uint8_t> inp_data = final_tensor.get_host_data();
// Test
triton::client::InferInput* inp_ptr;
triton::client::InferInput::Create(
&inp_ptr, model_input.name, {inp_tensor.shape(0), inp_tensor.shape(1)}, model_input.datatype.triton_str());
std::shared_ptr<triton::client::InferInput> inp_shared;
inp_shared.reset(inp_ptr);
inp_ptr->AppendRaw(inp_data);
return std::make_pair(inp_shared, std::move(inp_data));
}
std::shared_ptr<const triton::client::InferRequestedOutput> build_output(const TritonInOut& model_output)
{
triton::client::InferRequestedOutput* out_ptr;
triton::client::InferRequestedOutput::Create(&out_ptr, model_output.name);
std::shared_ptr<const triton::client::InferRequestedOutput> out_shared;
out_shared.reset(out_ptr);
return out_shared;
}
void reduce_outputs(const InferenceClientStage::sink_type_t& x, buffer_map_t& output_buffers, TensorMap& output_tensors)
{
// When our tensor lengths are longer than our dataframe we will need to use the seq_ids array to
// lookup how the values should map back into the dataframe.
auto host_seq_ids = get_seq_ids(x);
TensorMap reduced_outputs;
for (const auto& output : output_tensors)
{
auto& tensor = output.second;
ShapeType shape = tensor.get_shape();
ShapeType stride = tensor.get_stride();
ShapeType reduced_shape{shape};
reduced_shape[0] = x->mess_count;
auto& buffer = output_buffers[output.first];
auto reduced_buffer =
MatxUtil::reduce_max(DevMemInfo{buffer, tensor.dtype(), shape, stride}, host_seq_ids, 0, reduced_shape);
output_buffers[output.first] = reduced_buffer;
reduced_outputs[output.first].swap(
Tensor::create(std::move(reduced_buffer), tensor.dtype(), reduced_shape, stride, 0));
}
output_tensors = std::move(reduced_outputs);
}
void apply_logits(buffer_map_t& output_buffers, TensorMap& output_tensors)
{
TensorMap logit_outputs;
for (const auto& output : output_tensors)
{
auto& input_tensor = output.second;
auto shape = input_tensor.get_shape();
auto stride = input_tensor.get_stride();
auto& buffer = output_buffers[output.first];
auto output_buffer = MatxUtil::logits(DevMemInfo{buffer, input_tensor.dtype(), shape, stride});
output_buffers[output.first] = output_buffer;
// For logits the input and output shapes will be the same
logit_outputs[output.first].swap(
Tensor::create(std::move(output_buffer), input_tensor.dtype(), shape, stride, 0));
}
output_tensors = std::move(logit_outputs);
}
} // namespace
namespace morpheus {
// Component public implementations
// ************ InferenceClientStage ************************* //
InferenceClientStage::InferenceClientStage(std::string model_name,
std::string server_url,
bool force_convert_inputs,
bool use_shared_memory,
bool needs_logits,
std::map<std::string, std::string> inout_mapping) :
PythonNode(base_t::op_factory_from_sub_fn(build_operator())),
m_model_name(std::move(model_name)),
m_server_url(std::move(server_url)),
m_force_convert_inputs(force_convert_inputs),
m_use_shared_memory(use_shared_memory),
m_needs_logits(needs_logits),
m_inout_mapping(std::move(inout_mapping)),
m_options(m_model_name)
{
// Connect with the server to setup the inputs/outputs
this->connect_with_server(); // TODO(Devin)
}
InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator()
{
return [this](rxcpp::observable<sink_type_t> input, rxcpp::subscriber<source_type_t> output) {
std::unique_ptr<triton::client::InferenceServerHttpClient> client;
CHECK_TRITON(triton::client::InferenceServerHttpClient::Create(&client, m_server_url, false));
return input.subscribe(rxcpp::make_observer<sink_type_t>(
[this, &output, &client](sink_type_t x) {
// Using the `count` which is the number of rows in the inference tensors. We will check later if this
// doesn't match the number of rows in the dataframe (`mess_count`). This happens when the size of the
// input is too large and needs to be broken up in chunks in the pre-process stage. When this is the
// case we will reduce the rows in the response outputs such that we have a single response for each
// row int he dataframe.
TensorMap output_tensors;
buffer_map_t output_buffers;
build_output_tensors(x->count, m_model_outputs, output_buffers, output_tensors);
for (TensorIndex start = 0; start < x->count; start += m_max_batch_size)
{
triton::client::InferInput* input1;
TensorIndex stop = std::min(start + m_max_batch_size, x->count);
sink_type_t mini_batch_input = x->get_slice(start, stop);
// Iterate on the model inputs in case the model takes less than what tensors are available
std::vector<std::pair<std::shared_ptr<triton::client::InferInput>, std::vector<uint8_t>>>
saved_inputs = foreach_map(m_model_inputs, [&mini_batch_input](auto const& model_input) {
return (build_input(mini_batch_input, model_input));
});
std::vector<std::shared_ptr<const triton::client::InferRequestedOutput>> saved_outputs =
foreach_map(m_model_outputs, [](auto const& model_output) {
// Generate the outputs to be requested.
return build_output(model_output);
});
std::vector<triton::client::InferInput*> inputs =
foreach_map(saved_inputs, [](auto x) { return x.first.get(); });
std::vector<const triton::client::InferRequestedOutput*> outputs =
foreach_map(saved_outputs, [](auto x) { return x.get(); });
auto results = std::unique_ptr<triton::client::InferResult>([&]() {
triton::client::InferResult* results;
CHECK_TRITON(client->Infer(&results, m_options, inputs, outputs));
return results;
}());
for (auto& model_output : m_model_outputs)
{
std::vector<int64_t> output_shape;
CHECK_TRITON(results->Shape(model_output.name, &output_shape));
// Make sure we have at least 2 dims
while (output_shape.size() < 2)
{
output_shape.push_back(1);
}
const uint8_t* output_ptr = nullptr;
size_t output_ptr_size = 0;
CHECK_TRITON(results->RawData(model_output.name, &output_ptr, &output_ptr_size));
auto output_tensor = output_tensors[model_output.mapped_name].slice({start, 0}, {stop, -1});
DCHECK_EQ(stop - start, output_shape[0]);
DCHECK_EQ(output_tensor.bytes(), output_ptr_size);
DCHECK_NOTNULL(output_ptr);
DCHECK_NOTNULL(output_tensor.data());
MRC_CHECK_CUDA(
cudaMemcpy(output_tensor.data(), output_ptr, output_ptr_size, cudaMemcpyHostToDevice));
}
}
if (x->mess_count != x->count)
{
reduce_outputs(x, output_buffers, output_tensors);
}
// If we need to do logits, do that here
if (m_needs_logits)
{
apply_logits(output_buffers, output_tensors);
}
// Final output of all mini-batches
auto response_mem = std::make_shared<ResponseMemory>(x->mess_count, std::move(output_tensors));
auto response = std::make_shared<MultiResponseMessage>(
x->meta, x->mess_offset, x->mess_count, std::move(response_mem), 0, response_mem->count);
output.on_next(std::move(response));
},
[&](std::exception_ptr error_ptr) { output.on_error(error_ptr); },
[&]() { output.on_completed(); }));
};
}
void InferenceClientStage::connect_with_server()
{
std::string server_url = m_server_url;
std::unique_ptr<triton::client::InferenceServerHttpClient> client;
auto result = triton::client::InferenceServerHttpClient::Create(&client, server_url, false);
// Now load the input/outputs for the model
bool is_server_live = false;
triton::client::Error status = client->IsServerLive(&is_server_live);
if (!status.IsOk())
{
if (this->is_default_grpc_port(server_url))
{
LOG(WARNING) << "Failed to connect to Triton at '" << m_server_url
<< "'. Default gRPC port of (8001) was detected but C++ "
"InferenceClientStage uses HTTP protocol. Retrying with default HTTP port (8000)";
// We are using the default gRPC port, try the default HTTP
std::unique_ptr<triton::client::InferenceServerHttpClient> unique_client;
auto result = triton::client::InferenceServerHttpClient::Create(&unique_client, server_url, false);
client = std::move(unique_client);
status = client->IsServerLive(&is_server_live);
}
else if (status.Message().find("Unsupported protocol") != std::string::npos)
{
throw std::runtime_error(MORPHEUS_CONCAT_STR(
"Failed to connect to Triton at '"
<< m_server_url
<< "'. Received 'Unsupported Protocol' error. Are you using the right port? The C++ "
"InferenceClientStage uses Triton's HTTP protocol instead of gRPC. Ensure you have "
"specified the HTTP port (Default 8000)."));
}
if (!status.IsOk())
throw std::runtime_error(
MORPHEUS_CONCAT_STR("Unable to connect to Triton at '"
<< m_server_url << "'. Check the URL and port and ensure the server is running."));
}
// Save this for new clients
m_server_url = server_url;
if (!is_server_live)
throw std::runtime_error("Server is not live");
bool is_server_ready = false;
CHECK_TRITON(client->IsServerReady(&is_server_ready));
if (!is_server_ready)
throw std::runtime_error("Server is not ready");
bool is_model_ready = false;
CHECK_TRITON(client->IsModelReady(&is_model_ready, this->m_model_name));
if (!is_model_ready)
throw std::runtime_error("Model is not ready");
std::string model_metadata_json;
CHECK_TRITON(client->ModelMetadata(&model_metadata_json, this->m_model_name));
auto model_metadata = nlohmann::json::parse(model_metadata_json);
std::string model_config_json;
CHECK_TRITON(client->ModelConfig(&model_config_json, this->m_model_name));
auto model_config = nlohmann::json::parse(model_config_json);
if (model_config.contains("max_batch_size"))
{
m_max_batch_size = model_config.at("max_batch_size").get<TensorIndex>();
}
for (auto const& input : model_metadata.at("inputs"))
{
auto shape = input.at("shape").get<ShapeType>();
auto dtype = DType::from_triton(input.at("datatype").get<std::string>());
size_t bytes = dtype.item_size();
for (auto& y : shape)
{
if (y == -1)
{
y = m_max_batch_size;
}
bytes *= y;
}
auto mapped_name = input.at("name").get<std::string>();
if (m_inout_mapping.find(mapped_name) != m_inout_mapping.end())
{
mapped_name = m_inout_mapping[mapped_name];
}
m_model_inputs.push_back(TritonInOut{input.at("name").get<std::string>(),
bytes,
DType::from_triton(input.at("datatype").get<std::string>()),
shape,
mapped_name,
0});
}
for (auto const& output : model_metadata.at("outputs"))
{
auto shape = output.at("shape").get<ShapeType>();
auto dtype = DType::from_triton(output.at("datatype").get<std::string>());
size_t bytes = dtype.item_size();
for (auto& y : shape)
{
if (y == -1)
{
y = m_max_batch_size;
}
bytes *= y;
}
auto mapped_name = output.at("name").get<std::string>();
if (m_inout_mapping.find(mapped_name) != m_inout_mapping.end())
{
mapped_name = m_inout_mapping[mapped_name];
}
m_model_outputs.push_back(
TritonInOut{output.at("name").get<std::string>(), bytes, dtype, shape, mapped_name, 0});
}
}
bool InferenceClientStage::is_default_grpc_port(std::string& server_url)
{
// Check if we are the default gRPC port of 8001 and try 8000 for http client instead
size_t colon_loc = server_url.find_last_of(':');
if (colon_loc == -1)
{
return false;
}
// Check if the port matches 8001
if (server_url.size() < colon_loc + 1 || server_url.substr(colon_loc + 1) != "8001")
{
return false;
}
// It matches, change to 8000
server_url = server_url.substr(0, colon_loc) + ":8000";
return true;
}
// ************ InferenceClientStageInterfaceProxy********* //
std::shared_ptr<mrc::segment::Object<InferenceClientStage>> InferenceClientStageInterfaceProxy::init(
mrc::segment::Builder& builder,
const std::string& name,
std::string model_name,
std::string server_url,
bool force_convert_inputs,
bool use_shared_memory,
bool needs_logits,
std::map<std::string, std::string> inout_mapping)
{
auto stage = builder.construct_object<InferenceClientStage>(
name, model_name, server_url, force_convert_inputs, use_shared_memory, needs_logits, inout_mapping);
return stage;
}
} // namespace morpheus