Source code for nv_ingest.modules.injectors.task_injection

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


import logging

import mrc
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module

from nv_ingest.schemas.task_injection_schema import TaskInjectionSchema
from nv_ingest.util.exception_handlers.decorators import nv_ingest_node_failure_context_manager
from nv_ingest.util.modules.config_validator import fetch_and_validate_module_config
from nv_ingest.util.tracing import traceable
from nv_ingest_api.primitives.ingest_control_message import IngestControlMessage

logger = logging.getLogger(__name__)

MODULE_NAME = "task_injection"
MODULE_NAMESPACE = "nv_ingest"

TaskInjectorLoaderFactory = ModuleLoaderFactory(MODULE_NAME, MODULE_NAMESPACE, TaskInjectionSchema)


[docs] def on_data(message: IngestControlMessage): message.get_metadata("task_meta") return message
@register_module(MODULE_NAME, MODULE_NAMESPACE) def _task_injection(builder: mrc.Builder): validated_config = fetch_and_validate_module_config(builder, TaskInjectionSchema) @nv_ingest_node_failure_context_manager( annotation_id=MODULE_NAME, raise_on_failure=validated_config.raise_on_failure, ) @traceable(MODULE_NAME) def _on_data(ctrl_msg: IngestControlMessage): return on_data(ctrl_msg) ctrl_msg.get_metadata("task_meta") return ctrl_msg node = builder.make_node("vdb_resource_tagging", on_data) builder.register_module_input("input", node) builder.register_module_output("output", node)