Program Listing for File cuda_stream_handler.hpp

Return to documentation for file (include/holoscan/utils/cuda_stream_handler.hpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2022-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef INCLUDE_HOLOSCAN_UTILS_CUDA_STREAM_HANDLER_HPP #define INCLUDE_HOLOSCAN_UTILS_CUDA_STREAM_HANDLER_HPP #include <memory> #include <utility> #include <vector> #include "../core/operator_spec.hpp" #include "../core/parameter.hpp" #include "../core/resources/gxf/cuda_stream_pool.hpp" #include "gxf/cuda/cuda_stream.hpp" #include "gxf/cuda/cuda_stream_id.hpp" #include "gxf/cuda/cuda_stream_pool.hpp" namespace holoscan { class CudaStreamHandler { public: ~CudaStreamHandler() { for (auto&& event : cuda_events_) { const cudaError_t result = cudaEventDestroy(event); if (cudaSuccess != result) { HOLOSCAN_LOG_ERROR("Failed to destroy CUDA event: %s", cudaGetErrorString(result)); } } cuda_events_.clear(); } void defineParams(OperatorSpec& spec, bool required = false) { spec.param(cuda_stream_pool_, "cuda_stream_pool", "CUDA Stream Pool", "Instance of gxf::CudaStreamPool."); cuda_stream_pool_required_ = required; } gxf_result_t fromMessage(gxf_context_t context, const nvidia::gxf::Expected<nvidia::gxf::Entity>& message) { // if the message contains a stream use this const auto maybe_cuda_stream_id = message.value().get<nvidia::gxf::CudaStreamId>(); if (maybe_cuda_stream_id) { const auto maybe_cuda_stream_handle = nvidia::gxf::Handle<nvidia::gxf::CudaStream>::Create( context, maybe_cuda_stream_id.value()->stream_cid); if (maybe_cuda_stream_handle) { message_cuda_stream_handle_ = maybe_cuda_stream_handle.value(); } } else { // if no stream had been found, allocate a stream and use that gxf_result_t result = allocateInternalStream(context); if (result != GXF_SUCCESS) { return result; } message_cuda_stream_handle_ = cuda_stream_handle_; } return GXF_SUCCESS; } gxf_result_t fromMessages(gxf_context_t context, const std::vector<nvidia::gxf::Entity>& messages) { const gxf_result_t result = allocateInternalStream(context); if (result != GXF_SUCCESS) { return result; } if (!cuda_stream_handle_) { // if no CUDA stream can be allocated because no stream pool is set, then don't sync // with incoming streams. CUDA operations of this operator will use the default stream // which sync with all other streams by default. return GXF_SUCCESS; } // iterate through all messages and use events to chain incoming streams with the internal // stream auto event_it = cuda_events_.begin(); for (auto& msg : messages) { const auto maybe_cuda_stream_id = msg.get<nvidia::gxf::CudaStreamId>(); if (maybe_cuda_stream_id) { const auto maybe_cuda_stream_handle = nvidia::gxf::Handle<nvidia::gxf::CudaStream>::Create( context, maybe_cuda_stream_id.value()->stream_cid); if (maybe_cuda_stream_handle) { const cudaStream_t cuda_stream = maybe_cuda_stream_handle.value()->stream().value(); cudaError_t result; // allocate a new event if needed if (event_it == cuda_events_.end()) { cudaEvent_t cuda_event; result = cudaEventCreateWithFlags(&cuda_event, cudaEventDisableTiming); if (cudaSuccess != result) { HOLOSCAN_LOG_ERROR("Failed to create input CUDA event: %s", cudaGetErrorString(result)); return GXF_FAILURE; } cuda_events_.push_back(cuda_event); event_it = cuda_events_.end(); --event_it; } result = cudaEventRecord(*event_it, cuda_stream); if (cudaSuccess != result) { HOLOSCAN_LOG_ERROR("Failed to record event for message stream: %s", cudaGetErrorString(result)); return GXF_FAILURE; } result = cudaStreamWaitEvent(cuda_stream_handle_->stream().value(), *event_it); if (cudaSuccess != result) { HOLOSCAN_LOG_ERROR("Failed to record wait on message event: %s", cudaGetErrorString(result)); return GXF_FAILURE; } ++event_it; } } } message_cuda_stream_handle_ = cuda_stream_handle_; return GXF_SUCCESS; } gxf_result_t toMessage(nvidia::gxf::Expected<nvidia::gxf::Entity>& message) { if (message_cuda_stream_handle_) { const auto maybe_stream_id = message.value().add<nvidia::gxf::CudaStreamId>(); if (!maybe_stream_id) { HOLOSCAN_LOG_ERROR("Failed to add CUDA stream id to output message."); return nvidia::gxf::ToResultCode(maybe_stream_id); } maybe_stream_id.value()->stream_cid = message_cuda_stream_handle_.cid(); } return GXF_SUCCESS; } nvidia::gxf::Handle<nvidia::gxf::CudaStream> getStreamHandle(gxf_context_t context) { // If there is a message stream handle, return this if (message_cuda_stream_handle_) { return message_cuda_stream_handle_; } // else allocate an internal CUDA stream and return it allocateInternalStream(context); return cuda_stream_handle_; } cudaStream_t getCudaStream(gxf_context_t context) { const nvidia::gxf::Handle<nvidia::gxf::CudaStream> cuda_stream_handle = getStreamHandle(context); if (cuda_stream_handle) { return cuda_stream_handle->stream().value(); } if (!default_stream_warning_) { default_stream_warning_ = true; HOLOSCAN_LOG_WARN( "Parameter `cuda_stream_pool` is not set, using the default CUDA stream for CUDA " "operations."); } return cudaStreamDefault; } private: gxf_result_t allocateInternalStream(gxf_context_t context) { // Create the CUDA stream if it does not yet exist. if (!cuda_stream_handle_) { // Check if a cuda stream pool is given. const bool has_cuda_stream_pool_ = cuda_stream_pool_.has_value() && cuda_stream_pool_.get(); if (!has_cuda_stream_pool_) { // If the cuda stream pool is required return an error if (cuda_stream_pool_required_) { HOLOSCAN_LOG_ERROR("'cuda_stream_pool' is required but not set."); return GXF_FAILURE; } return GXF_SUCCESS; } // get Handle to underlying nvidia::gxf::CudaStreamPool from // std::shared_ptr<holoscan::CudaStreamPool> const auto cuda_stream_pool = nvidia::gxf::Handle<nvidia::gxf::CudaStreamPool>::Create( context, cuda_stream_pool_.get()->gxf_cid()); if (cuda_stream_pool) { // allocate a stream auto maybe_stream = cuda_stream_pool.value()->allocateStream(); if (!maybe_stream) { HOLOSCAN_LOG_ERROR("Failed to allocate CUDA stream"); return nvidia::gxf::ToResultCode(maybe_stream); } cuda_stream_handle_ = std::move(maybe_stream.value()); } } return GXF_SUCCESS; } bool cuda_stream_pool_required_ = false; Parameter<std::shared_ptr<CudaStreamPool>> cuda_stream_pool_; bool default_stream_warning_ = false; std::vector<cudaEvent_t> cuda_events_; nvidia::gxf::Handle<nvidia::gxf::CudaStream> message_cuda_stream_handle_; nvidia::gxf::Handle<nvidia::gxf::CudaStream> cuda_stream_handle_; }; } // namespace holoscan #endif/* INCLUDE_HOLOSCAN_UTILS_CUDA_STREAM_HANDLER_HPP */

© Copyright 2022-2023, NVIDIA. Last updated on Sep 13, 2023.