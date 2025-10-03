NVIDIA Holoscan SDK v3.6.1
/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef HOLOSCAN_CORE_IO_SPEC_HPP
#define HOLOSCAN_CORE_IO_SPEC_HPP

#include <yaml-cpp/yaml.h>

#include <iostream>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <typeinfo>
#include <utility>
#include <vector>

#include "./common.hpp"
#include "./condition.hpp"
#include "./conditions/gxf/asynchronous.hpp"
#include "./conditions/gxf/boolean.hpp"
#include "./conditions/gxf/count.hpp"
#include "./conditions/gxf/downstream_affordable.hpp"
#include "./conditions/gxf/expiring_message.hpp"
#include "./conditions/gxf/message_available.hpp"
#include "./conditions/gxf/multi_message_available_timeout.hpp"
#include "./conditions/gxf/periodic.hpp"
#include "./resource.hpp"
#include "./resources/gxf/async_buffer_receiver.hpp"
#include "./resources/gxf/async_buffer_transmitter.hpp"
#include "./resources/gxf/double_buffer_receiver.hpp"
#include "./resources/gxf/double_buffer_transmitter.hpp"
#include "./resources/gxf/ucx_receiver.hpp"
#include "./resources/gxf/ucx_transmitter.hpp"

namespace holoscan {

// Forward declarations
class OperatorSpec;
class Operator;

class IOSpec {
 public:
  virtual ~IOSpec() = default;

  enum class IOType { kInput, kOutput };

  class IOSize {
   public:
    explicit IOSize(int64_t size = 0) : size_(size) {}

    void size(int64_t size) { size_ = size; }

    int64_t size() const { return size_; }

    operator int64_t() const { return size_; }

   private:
    int64_t size_;
  };

  // Define the static constants for the IOSize class.
  inline static const IOSize kAnySize = IOSize{-1};
  inline static const IOSize kPrecedingCount = IOSize{0};
  inline static const IOSize kSizeOne = IOSize{1};

  enum class ConnectorType { kDefault, kDoubleBuffer, kAsyncBuffer, kUCX };

  enum class QueuePolicy : uint8_t {
    kPop = 0,
    kReject = 1,
    kFault = 2,
  };

  IOSpec(OperatorSpec* op_spec, const std::string& name, IOType io_type,
         const std::type_info* typeinfo = &typeid(void*), IOSpec::IOSize size = IOSpec::kSizeOne,
         std::optional<IOSpec::QueuePolicy> policy = std::nullopt)
      : op_spec_(op_spec),
        io_type_(io_type),
        typeinfo_(typeinfo),
        queue_size_(size),
        queue_policy_(policy) {
    // Operator::parse_port_name requires that "." is not allowed in the IOSPec name
    if (name.find(".") != std::string::npos) {
      throw std::invalid_argument(fmt::format(
          "The . character is reserved and cannot be used in the port (IOSpec) name ('{}').",
          name));
    }
    name_ = name;
  }

  OperatorSpec* op_spec() const { return op_spec_; }

  const std::string& name() const { return name_; }

  IOType io_type() const { return io_type_; }

  ConnectorType connector_type() const { return connector_type_; }

  const std::type_info* typeinfo() const { return typeinfo_; }

  std::vector<std::pair<ConditionType, std::shared_ptr<Condition>>>& conditions() {
    return conditions_;
  }

  template <typename... ArgsT>
  IOSpec& condition(ConditionType type, ArgsT&&... args) {
    switch (type) {
      case ConditionType::kMessageAvailable:
        conditions_.emplace_back(
            type, std::make_shared<MessageAvailableCondition>(std::forward<ArgsT>(args)...));
        break;
      case ConditionType::kExpiringMessageAvailable:
        conditions_.emplace_back(
            type,
            std::make_shared<ExpiringMessageAvailableCondition>(std::forward<ArgsT>(args)...));
        break;
      case ConditionType::kDownstreamMessageAffordable:
        conditions_.emplace_back(
            type,
            std::make_shared<DownstreamMessageAffordableCondition>(std::forward<ArgsT>(args)...));
        break;
      case ConditionType::kMultiMessageAvailableTimeout:
        // May want to use this multi-message condition even with a single port as a way to have
        // a timeout on the condition. Unlike ExpiringMessageAvailableCondition, this one does not
        // require a timestamp to be emitted by the upstream operator.
        conditions_.emplace_back(
            type,
            std::make_shared<MultiMessageAvailableTimeoutCondition>(std::forward<ArgsT>(args)...));
        break;
      case ConditionType::kNone:
        conditions_.emplace_back(type, nullptr);
        break;
      default:
        HOLOSCAN_LOG_ERROR("Unsupported condition type for IOSpec: {}", static_cast<int>(type));
        break;
    }

    if (queue_size_ == kAnySize) {
      HOLOSCAN_LOG_WARN(
          "The queue size is currently set to 'any size' (IOSpec::kAnySize in C++ or "
          "IOSpec.ANY_SIZE in Python) "
          "for receivers that don't support condition changes. Please set the queue size "
          "explicitly when calling the input() method in setup() if you want to use the ordinary "
          "input port with the condition (input port: {}).",
          name_);
    }

    return *this;
  }

  std::shared_ptr<Resource> connector() const { return connector_; }

  void connector(std::shared_ptr<Resource> connector) { connector_ = std::move(connector); }

  template <typename... ArgsT>
  IOSpec& connector(ConnectorType type, ArgsT&&... args) {
    connector_type_ = type;
    switch (type) {
      case ConnectorType::kDefault:
        // default receiver or transmitter will be created in GXFExecutor::run instead
        connector_.reset();
        break;
      case ConnectorType::kDoubleBuffer:
        if (io_type_ == IOType::kInput) {
          connector_ = std::make_shared<DoubleBufferReceiver>(std::forward<ArgsT>(args)...);
        } else {
          connector_ = std::make_shared<DoubleBufferTransmitter>(std::forward<ArgsT>(args)...);
        }
        break;
      case ConnectorType::kAsyncBuffer:
        // throw error if there are other args passed to this type of connector
        if (sizeof...(args) > 0) {
          throw std::invalid_argument(fmt::format(
              "AsyncBufferReceiver and AsyncBufferTransmitter do not support any extra arguments "
              "such as capacity or policy. "
              "Please check the arguments of the"
              "port '{}'.",
              name_));
        }
        if (io_type_ == IOType::kInput) {
          connector_ = std::make_shared<AsyncBufferReceiver>(std::forward<ArgsT>(args)...);
        } else {
          connector_ = std::make_shared<AsyncBufferTransmitter>(std::forward<ArgsT>(args)...);
        }
        break;
      case ConnectorType::kUCX:
        if (io_type_ == IOType::kInput) {
          connector_ = std::make_shared<UcxReceiver>(std::forward<ArgsT>(args)...);
        } else {
          connector_ = std::make_shared<UcxTransmitter>(std::forward<ArgsT>(args)...);
        }
        break;
      default:
        HOLOSCAN_LOG_ERROR("Unknown connector type {}", static_cast<int>(type));
        break;
    }

    if (queue_size_ == kAnySize) {
      HOLOSCAN_LOG_WARN(
          "The queue size is currently set to 'any size' (IOSpec::kAnySize in C++ or "
          "IOSpec.ANY_SIZE in Python) "
          "for receivers that don't support connector changes. Please set the queue size "
          "explicitly when calling the input() method in setup() if you want to use the ordinary "
          "input port with the condition (input port: {}).",
          name_);
    }
    return *this;
  }

  int64_t queue_size() const { return queue_size_.size(); }

  IOSpec& queue_size(int64_t size) {
    if (connector_type_ == ConnectorType::kAsyncBuffer) {
      HOLOSCAN_LOG_WARN(
          "queue_size is not supported for IOSpec::ConnectorType::kAsyncBuffer. "
          "Please check the queue size of the input/output port '{}'.",
          name_);
    } else {
      queue_size_.size(size);
    }
    return *this;
  }

  std::optional<IOSpec::QueuePolicy> queue_policy() const { return queue_policy_; }

  IOSpec& queue_policy(IOSpec::QueuePolicy policy) {
    if (connector_type_ == ConnectorType::kAsyncBuffer) {
      HOLOSCAN_LOG_WARN(
          "queue_policy is not supported for IOSpec::ConnectorType::kAsyncBuffer. "
          "Please check the queue policy of the input/output port '{}'.",
          name_);
    } else {
      queue_policy_ = policy;
    }
    return *this;
  }

  virtual YAML::Node to_yaml_node() const;

  std::string description() const;

  const std::string& unique_id() const { return unique_id_; }

  void set_unique_id(const std::string& unique_id) { unique_id_ = unique_id; }

 private:
  OperatorSpec* op_spec_ = nullptr;
  std::string name_;
  IOType io_type_;
  const std::type_info* typeinfo_ = nullptr;
  std::shared_ptr<Resource> connector_;
  std::vector<std::pair<ConditionType, std::shared_ptr<Condition>>> conditions_;
  ConnectorType connector_type_ = ConnectorType::kDefault;
  IOSize queue_size_ = kSizeOne;
  std::optional<QueuePolicy> queue_policy_ = std::nullopt;
  std::string unique_id_;
};

}  // namespace holoscan

#endif/* HOLOSCAN_CORE_IO_SPEC_HPP */

