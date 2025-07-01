What can I help you with?
NVIDIA Holoscan SDK v3.3.0
Program Listing for File operator.hpp

Return to documentation for file (include/holoscan/core/operator.hpp)

/*
* SPDX-FileCopyrightText: Copyright (c) 2022-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef HOLOSCAN_CORE_OPERATOR_HPP
#define HOLOSCAN_CORE_OPERATOR_HPP

#include <yaml-cpp/yaml.h>

#include <stdio.h>
#include <algorithm>
#include <iostream>
#include <map>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>

#include "./arg.hpp"
#include "./codec_registry.hpp"
#include "./common.hpp"
#include "./component.hpp"
#include "./condition.hpp"
#include "./forward_def.hpp"
#include "./graph.hpp"
#include "./io_spec.hpp"
#include "./messagelabel.hpp"
#include "./metadata.hpp"
#include "./operator_spec.hpp"
#include "./operator_status.hpp"
#include "./resource.hpp"

#include "gxf/app/graph_entity.hpp"
#include "gxf/core/gxf.h"

#define HOLOSCAN_OPERATOR_FORWARD_TEMPLATE()                                            \
template <typename ArgT,                                                              \
typename... ArgsT,                                                          \
typename = std::enable_if_t<                                                \
!std::is_base_of_v<holoscan::Operator, std::decay_t<ArgT>> &&           \
(std::is_same_v<holoscan::Arg, std::decay_t<ArgT>> ||                   \
std::is_same_v<holoscan::ArgList, std::decay_t<ArgT>> ||               \
std::is_base_of_v<holoscan::Condition,                                 \
typename holoscan::type_info<ArgT>::derived_type> || \
std::is_base_of_v<holoscan::Resource,                                  \
typename holoscan::type_info<ArgT>::derived_type>)>>

#define HOLOSCAN_OPERATOR_FORWARD_ARGS(class_name) \
HOLOSCAN_OPERATOR_FORWARD_TEMPLATE()             \
explicit class_name(ArgT&& arg, ArgsT&&... args) \
: Operator(std::forward<ArgT>(arg), std::forward<ArgsT>(args)...) {}

#define HOLOSCAN_OPERATOR_FORWARD_ARGS_SUPER(class_name, super_class_name) \
HOLOSCAN_OPERATOR_FORWARD_TEMPLATE()                                     \
explicit class_name(ArgT&& arg, ArgsT&&... args)                         \
: super_class_name(std::forward<ArgT>(arg), std::forward<ArgsT>(args)...) {}

namespace holoscan {

// Forward declarations
class ExecutionContext;
class InputContext;
class OutputContext;

namespace gxf {
class GXFExecutor;
}  // namespace gxf

class Operator : public ComponentBase {
 public:
  enum class OperatorType {
    kNative,
    kGXF,
    kVirtual,
  };

  static constexpr const char* kInputExecPortName = "__input_exec__";
  static constexpr const char* kOutputExecPortName = "__output_exec__";

  HOLOSCAN_OPERATOR_FORWARD_TEMPLATE()
  explicit Operator(ArgT&& arg, ArgsT&&... args) {
    add_arg(std::forward<ArgT>(arg));
    (add_arg(std::forward<ArgsT>(args)), ...);
  }

  Operator() = default;

  ~Operator() override = default;

  OperatorType operator_type() const { return operator_type_; }

  using ComponentBase::id;
  Operator& id(int64_t id) {
    id_ = id;
    return *this;
  }

  using ComponentBase::name;
  Operator& name(const std::string& name) {
    // Operator::parse_port_name requires that "." is not allowed in the Operator name
    if (name.find(".") != std::string::npos) {
      throw std::invalid_argument(fmt::format(
          "The . character is reserved and cannot be used in the operator name ('{}').", name));
    }
    name_ = name;
    return *this;
  }

  using ComponentBase::fragment;
  Operator& fragment(Fragment* fragment) {
    fragment_ = fragment;
    return *this;
  }

  Operator& spec(const std::shared_ptr<OperatorSpec>& spec) {
    spec_ = spec;
    return *this;
  }
  OperatorSpec* spec() {
    if (!spec_) {
      HOLOSCAN_LOG_WARN("OperatorSpec of Operator '{}' is not initialized, returning nullptr",
                        name_);
      return nullptr;
    }
    return spec_.get();
  }

  std::shared_ptr<OperatorSpec> spec_shared() { return spec_; }

  template <typename ConditionT>
  std::shared_ptr<ConditionT> condition(const std::string& name) {
    if (auto condition = conditions_.find(name); condition != conditions_.end()) {
      return std::dynamic_pointer_cast<ConditionT>(condition->second);
    }
    return nullptr;
  }

  std::unordered_map<std::string, std::shared_ptr<Condition>>& conditions() { return conditions_; }

  template <typename ResourceT>
  std::shared_ptr<ResourceT> resource(const std::string& name) {
    if (auto resource = resources_.find(name); resource != resources_.end()) {
      return std::dynamic_pointer_cast<ResourceT>(resource->second);
    }
    return nullptr;
  }

  std::unordered_map<std::string, std::shared_ptr<Resource>>& resources() { return resources_; }

  using ComponentBase::add_arg;

  void add_arg(const std::shared_ptr<Condition>& arg) {
    if (conditions_.find(arg->name()) != conditions_.end()) {
      HOLOSCAN_LOG_ERROR(
          "Condition '{}' already exists in the operator. Please specify a unique "
          "name when creating a Condition instance.",
          arg->name());
    } else {
      conditions_[arg->name()] = arg;
    }
  }

  void add_arg(std::shared_ptr<Condition>&& arg) {
    if (conditions_.find(arg->name()) != conditions_.end()) {
      HOLOSCAN_LOG_ERROR(
          "Condition '{}' already exists in the operator. Please specify a unique "
          "name when creating a Condition instance.",
          arg->name());
    } else {
      conditions_[arg->name()] = std::move(arg);
    }
  }

  void add_arg(const std::shared_ptr<Resource>& arg) {
    if (resources_.find(arg->name()) != resources_.end()) {
      HOLOSCAN_LOG_ERROR(
          "Resource '{}' already exists in the operator. Please specify a unique "
          "name when creating a Resource instance.",
          arg->name());
    } else {
      resources_[arg->name()] = arg;
    }
  }

  void add_arg(std::shared_ptr<Resource>&& arg) {
    if (resources_.find(arg->name()) != resources_.end()) {
      HOLOSCAN_LOG_ERROR(
          "Resource '{}' already exists in the operator. Please specify a unique "
          "name when creating a Resource instance.",
          arg->name());
    } else {
      resources_[arg->name()] = std::move(arg);
    }
  }

  virtual void setup([[maybe_unused]] OperatorSpec& spec) {}

  bool is_root();

  bool is_user_defined_root();

  bool is_leaf();

  static bool is_all_operator_successor_virtual(OperatorNodeType op, OperatorGraph& graph);

  static bool is_all_operator_predecessor_virtual(OperatorNodeType op, OperatorGraph& graph);

  std::string qualified_name();

  void initialize() override;

  virtual void start() {
    // Empty default implementation
  }

  virtual void stop() {
    // Empty default implementation
  }

  virtual void compute([[maybe_unused]] InputContext& op_input,
                       [[maybe_unused]] OutputContext& op_output,
                       [[maybe_unused]] ExecutionContext& context) {}

  static std::pair<std::string, std::string> parse_port_name(const std::string& op_port_name);

  template <typename typeT>
  static void register_codec(const std::string& codec_name, bool overwrite = true) {
    CodecRegistry::get_instance().add_codec<typeT>(codec_name, overwrite);
  }

  YAML::Node to_yaml_node() const override;

  std::shared_ptr<nvidia::gxf::GraphEntity> graph_entity() { return graph_entity_; }

  std::shared_ptr<MetadataDictionary> metadata() { return dynamic_metadata_; }

  bool is_metadata_enabled() const;

  void enable_metadata(bool enable) { is_metadata_enabled_ = enable; }

  MetadataPolicy metadata_policy() const { return dynamic_metadata_->policy(); }

  void metadata_policy(MetadataPolicy policy) { dynamic_metadata_->policy(policy); }

  void add_cuda_stream_pool(int32_t dev_id = 0, uint32_t stream_flags = 0,
                            int32_t stream_priority = 0, uint32_t reserved_size = 1,
                            uint32_t max_size = 0);

  std::optional<std::shared_ptr<Receiver>> receiver(const std::string& port_name);

  std::optional<std::shared_ptr<Transmitter>> transmitter(const std::string& port_name);

  void queue_policy(const std::string& port_name, IOSpec::IOType port_type = IOSpec::IOType::kInput,
                    IOSpec::QueuePolicy policy = IOSpec::QueuePolicy::kFault);
  const std::shared_ptr<IOSpec>& input_exec_spec();
  const std::shared_ptr<IOSpec>& output_exec_spec();
  const std::function<void(const std::shared_ptr<Operator>&)>& dynamic_flow_func();
  std::shared_ptr<Operator> self_shared();

  struct FlowInfo {
    FlowInfo(const std::shared_ptr<Operator>& curr_operator, const std::string& output_port_name,
             const std::shared_ptr<Operator>& next_operator, const std::string& input_port_name)
        : curr_operator(curr_operator),
          output_port_name(output_port_name),
          output_port_spec(curr_operator->spec()->outputs()[output_port_name]),
          next_operator(next_operator),
          input_port_name(input_port_name),
          input_port_spec(next_operator->spec()->inputs()[input_port_name]) {}

    const std::shared_ptr<Operator> curr_operator;
    const std::string output_port_name;
    const std::shared_ptr<IOSpec> output_port_spec;
    const std::shared_ptr<Operator> next_operator;
    const std::string input_port_name;
    const std::shared_ptr<IOSpec> input_port_spec;
  };

  const std::vector<std::shared_ptr<FlowInfo>>& next_flows();

  void add_dynamic_flow(const std::shared_ptr<FlowInfo>& flow);

  void add_dynamic_flow(const std::vector<std::shared_ptr<FlowInfo>>& flows);

  void add_dynamic_flow(const std::string& curr_output_port_name,
                        const std::shared_ptr<Operator>& next_op,
                        const std::string& next_input_port_name = "");

  void add_dynamic_flow(const std::shared_ptr<Operator>& next_op,
                        const std::string& next_input_port_name = "");

  const std::shared_ptr<std::vector<std::shared_ptr<FlowInfo>>>& dynamic_flows();

  const std::shared_ptr<Operator::FlowInfo>& find_flow_info(
      const std::function<bool(const std::shared_ptr<Operator::FlowInfo>&)>& predicate);

  std::vector<std::shared_ptr<Operator::FlowInfo>> find_all_flow_info(
      const std::function<bool(const std::shared_ptr<Operator::FlowInfo>&)>& predicate);

  std::shared_ptr<holoscan::AsynchronousCondition> async_condition();

  void stop_execution();

  virtual std::shared_ptr<holoscan::ExecutionContext> execution_context() const;

  void ensure_contexts();

  virtual void release_internal_resources();

 protected:
  // Making the following classes as friend classes to allow them to access
  // get_consolidated_input_label, num_published_messages_map, update_input_message_label,
  // reset_input_message_labels and update_published_messages functions, which should only be called
  // externally by them
  friend class AnnotatedDoubleBufferReceiver;
  friend class AnnotatedDoubleBufferTransmitter;
  friend class HoloscanUcxTransmitter;
  friend class HoloscanUcxReceiver;
  friend class DFFTCollector;

  // Make GXFExecutor a friend class so it can call protected initialization methods
  friend class holoscan::gxf::GXFExecutor;
  // Fragment should be able to call reset_graph_entities
  friend class Fragment;

  friend gxf_result_t deannotate_message(gxf_uid_t* uid, const gxf_context_t& context, Operator* op,
                                         const char* name);
  friend gxf_result_t annotate_message(gxf_uid_t uid, const gxf_context_t& context, Operator* op,
                                       const char* name);

  gxf_uid_t initialize_graph_entity(void* context, const std::string& entity_prefix = "");

  void initialize_async_condition();

  virtual gxf_uid_t add_codelet_to_graph_entity();

  void initialize_conditions();

  void initialize_resources();

  using ComponentBase::update_params_from_args;

  void update_params_from_args();

  void update_connector_arguments();

  void find_ports_used_by_condition_args();

  virtual void set_parameters();

  MessageLabel get_consolidated_input_label();

  void update_input_message_label(std::string input_name, MessageLabel m) {
    input_message_labels[input_name] = m;
  }

  void delete_input_message_label(std::string input_name) {
    input_message_labels.erase(input_name);
  }

  void reset_input_message_labels() { input_message_labels.clear(); }

  std::map<std::string, uint64_t> num_published_messages_map() {
    return num_published_messages_map_;
  }

  void update_published_messages(std::string output_name);

  virtual void reset_graph_entities();

  void initialize_next_flows();

  std::vector<std::string>& non_default_input_ports() { return non_default_input_ports_; }
  std::vector<std::string>& non_default_output_ports() { return non_default_output_ports_; }

  void set_input_exec_spec(const std::shared_ptr<IOSpec>& input_exec_spec);
  void set_output_exec_spec(const std::shared_ptr<IOSpec>& output_exec_spec);
  void set_dynamic_flows(
      const std::function<void(const std::shared_ptr<Operator>&)>& dynamic_flow_func);
  void set_self_shared(const std::shared_ptr<Operator>& this_op);

  bool is_initialized_ = false;
  OperatorType operator_type_ = OperatorType::kNative;
  std::shared_ptr<OperatorSpec> spec_;
  std::unordered_map<std::string, std::shared_ptr<Condition>>
      conditions_;
  std::unordered_map<std::string, std::shared_ptr<Resource>>
      resources_;
  std::shared_ptr<nvidia::gxf::GraphEntity> graph_entity_;
  std::shared_ptr<holoscan::AsynchronousCondition> internal_async_condition_;
  std::shared_ptr<ExecutionContext> execution_context_{};

  std::shared_ptr<MetadataDictionary> dynamic_metadata_ =
      std::make_shared<MetadataDictionary>();
  std::optional<bool> is_metadata_enabled_ =
      std::nullopt;

  std::shared_ptr<IOSpec> input_exec_spec_;
  std::shared_ptr<IOSpec> output_exec_spec_;
  std::function<void(const std::shared_ptr<Operator>&)> dynamic_flow_func_ = nullptr;
  std::weak_ptr<Operator> self_shared_;
  std::shared_ptr<std::vector<std::shared_ptr<FlowInfo>>> next_flows_;
  std::shared_ptr<std::vector<std::shared_ptr<FlowInfo>>> dynamic_flows_;

 private:
  static inline const std::shared_ptr<FlowInfo> kEmptyFlowInfo{nullptr};

  void set_op_backend();

  bool has_ucx_connector();

  std::unordered_map<std::string, MessageLabel> input_message_labels;

  std::map<std::string, uint64_t> num_published_messages_map_;

  // Keep track of which ports have a user-assigned condition involving its receiver or
  // transmitter (a default condition will NOT be added to any such ports).
  std::vector<std::string> non_default_input_ports_;
  std::vector<std::string> non_default_output_ports_;

  void* op_backend_ptr = nullptr;
};

}  // namespace holoscan

#endif/* HOLOSCAN_CORE_OPERATOR_HPP */

© Copyright 2022-2025, NVIDIA. Last updated on Jul 1, 2025.
