Program Listing for File io_spec.hpp
↰ Return to documentation for file (include/holoscan/core/io_spec.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HOLOSCAN_CORE_IO_SPEC_HPP
#define HOLOSCAN_CORE_IO_SPEC_HPP
#include <yaml-cpp/yaml.h>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <string>
#include <typeinfo>
#include <utility>
#include <vector>
#include "./common.hpp"
#include "./condition.hpp"
#include "./conditions/gxf/asynchronous.hpp"
#include "./conditions/gxf/boolean.hpp"
#include "./conditions/gxf/count.hpp"
#include "./conditions/gxf/downstream_affordable.hpp"
#include "./conditions/gxf/expiring_message.hpp"
#include "./conditions/gxf/message_available.hpp"
#include "./conditions/gxf/multi_message_available_timeout.hpp"
#include "./conditions/gxf/periodic.hpp"
#include "./gxf/entity.hpp"
#include "./resource.hpp"
#include "./resources/gxf/double_buffer_receiver.hpp"
#include "./resources/gxf/double_buffer_transmitter.hpp"
#include "./resources/gxf/ucx_receiver.hpp"
#include "./resources/gxf/ucx_transmitter.hpp"
namespace holoscan {
class IOSpec {
public:
enum class IOType { kInput, kOutput };
class IOSize {
public:
explicit IOSize(int64_t size = 0) : size_(size) {}
void size(int64_t size) { size_ = size; }
int64_t size() const { return size_; }
operator int64_t() const { return size_; }
private:
int64_t size_;
};
// Define the static constants for the IOSize class.
inline static const IOSize kAnySize = IOSize{-1};
inline static const IOSize kPrecedingCount = IOSize{0};
inline static const IOSize kSizeOne = IOSize{1};
enum class ConnectorType { kDefault, kDoubleBuffer, kUCX };
IOSpec(OperatorSpec* op_spec, const std::string& name, IOType io_type,
const std::type_info* typeinfo = &typeid(holoscan::gxf::Entity),
IOSpec::IOSize size = IOSpec::kSizeOne)
: op_spec_(op_spec), io_type_(io_type), typeinfo_(typeinfo), queue_size_(size) {
// Operator::parse_port_name requires that "." is not allowed in the IOSPec name
if (name.find(".") != std::string::npos) {
throw std::invalid_argument(fmt::format(
"The . character is reserved and cannot be used in the port (IOSpec) name ('{}').",
name));
}
name_ = name;
}
OperatorSpec* op_spec() const { return op_spec_; }
const std::string& name() const { return name_; }
IOType io_type() const { return io_type_; }
ConnectorType connector_type() const { return connector_type_; }
const std::type_info* typeinfo() const { return typeinfo_; }
std::vector<std::pair<ConditionType, std::shared_ptr<Condition>>>& conditions() {
return conditions_;
}
template <typename... ArgsT>
IOSpec& condition(ConditionType type, ArgsT&&... args) {
switch (type) {
case ConditionType::kMessageAvailable:
conditions_.emplace_back(
type, std::make_shared<MessageAvailableCondition>(std::forward<ArgsT>(args)...));
break;
case ConditionType::kExpiringMessageAvailable:
conditions_.emplace_back(
type,
std::make_shared<ExpiringMessageAvailableCondition>(std::forward<ArgsT>(args)...));
break;
case ConditionType::kDownstreamMessageAffordable:
conditions_.emplace_back(
type,
std::make_shared<DownstreamMessageAffordableCondition>(std::forward<ArgsT>(args)...));
break;
case ConditionType::kMultiMessageAvailableTimeout:
// May want to use this multi-message condition even with a single port as a way to have
// a timeout on the condition. Unlike ExpiringMessageAvailableCondition, this one does not
// require a timestamp to be emitted by the upstream operator.
conditions_.emplace_back(
type,
std::make_shared<MultiMessageAvailableTimeoutCondition>(std::forward<ArgsT>(args)...));
break;
case ConditionType::kNone:
conditions_.emplace_back(type, nullptr);
break;
default:
HOLOSCAN_LOG_ERROR("Unsupported condition type for IOSpec: {}", static_cast<int>(type));
break;
}
if (queue_size_ == kAnySize) {
HOLOSCAN_LOG_WARN(
"The queue size is currently set to 'any size' (IOSpec::kAnySize in C++ or "
"IOSpec.ANY_SIZE in Python) "
"for receivers that don't support condition changes. Please set the queue size "
"explicitly when calling the input() method in setup() if you want to use the ordinary "
"input port with the condition (input port: {}).",
name_);
}
return *this;
}
std::shared_ptr<Resource> connector() const { return connector_; }
void connector(std::shared_ptr<Resource> connector) { connector_ = std::move(connector); }
template <typename... ArgsT>
IOSpec& connector(ConnectorType type, ArgsT&&... args) {
connector_type_ = type;
switch (type) {
case ConnectorType::kDefault:
// default receiver or transmitter will be created in GXFExecutor::run instead
connector_.reset();
break;
case ConnectorType::kDoubleBuffer:
if (io_type_ == IOType::kInput) {
connector_ = std::make_shared<DoubleBufferReceiver>(std::forward<ArgsT>(args)...);
} else {
connector_ = std::make_shared<DoubleBufferTransmitter>(std::forward<ArgsT>(args)...);
}
break;
case ConnectorType::kUCX:
if (io_type_ == IOType::kInput) {
connector_ = std::make_shared<UcxReceiver>(std::forward<ArgsT>(args)...);
} else {
connector_ = std::make_shared<UcxTransmitter>(std::forward<ArgsT>(args)...);
}
break;
default:
HOLOSCAN_LOG_ERROR("Unknown connector type {}", static_cast<int>(type));
break;
}
if (queue_size_ == kAnySize) {
HOLOSCAN_LOG_WARN(
"The queue size is currently set to 'any size' (IOSpec::kAnySize in C++ or "
"IOSpec.ANY_SIZE in Python) "
"for receivers that don't support connector changes. Please set the queue size "
"explicitly when calling the input() method in setup() if you want to use the ordinary "
"input port with the condition (input port: {}).",
name_);
}
return *this;
}
int64_t queue_size() const { return queue_size_.size(); }
IOSpec& queue_size(int64_t size) {
queue_size_.size(size);
return *this;
}
virtual YAML::Node to_yaml_node() const;
std::string description() const;
private:
OperatorSpec* op_spec_ = nullptr;
std::string name_;
IOType io_type_;
const std::type_info* typeinfo_ = nullptr;
std::shared_ptr<Resource> connector_;
std::vector<std::pair<ConditionType, std::shared_ptr<Condition>>> conditions_;
ConnectorType connector_type_ = ConnectorType::kDefault;
IOSize queue_size_ = kSizeOne;
};
} // namespace holoscan
#endif/* HOLOSCAN_CORE_IO_SPEC_HPP */