Source code for nv_ingest.framework.orchestration.morpheus.modules.injectors.metadata_injector

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging

import mrc
import pandas as pd
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module

from nv_ingest.framework.schemas.framework_metadata_injector_schema import MetadataInjectorSchema
from nv_ingest_api.internal.primitives.tracing.tagging import traceable

from nv_ingest_api.internal.schemas.meta.ingest_job_schema import DocumentTypeEnum
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.util.exception_handlers.decorators import (
    nv_ingest_node_failure_context_manager,
    unified_exception_handler,
)
from nv_ingest.framework.orchestration.morpheus.util.modules.config_validator import (
    fetch_and_validate_module_config,
)
from nv_ingest_api.internal.primitives.ingest_control_message import IngestControlMessage
from nv_ingest_api.util.converters.type_mappings import doc_type_to_content_type

logger = logging.getLogger(__name__)

MODULE_NAME = "metadata_injection"
MODULE_NAMESPACE = "nv_ingest"

MetadataInjectorLoaderFactory = ModuleLoaderFactory(MODULE_NAME, MODULE_NAMESPACE)


[docs] def on_data(message: IngestControlMessage): try: df = message.payload() update_required = False rows = [] logger.debug("Starting metadata injection on DataFrame with %d rows", len(df)) for _, row in df.iterrows(): try: # Convert document type to content type using enums content_type = doc_type_to_content_type(DocumentTypeEnum(row["document_type"])) # Check if metadata is missing or doesn't have 'content' if "metadata" not in row or not isinstance(row["metadata"], dict) or "content" not in row["metadata"]: update_required = True row["metadata"] = { "content": row.get("content"), "content_metadata": { "type": content_type.name.lower(), }, "error_metadata": None, "audio_metadata": ( None if content_type != ContentTypeEnum.AUDIO else {"audio_type": row["document_type"]} ), "image_metadata": ( None if content_type != ContentTypeEnum.IMAGE else {"image_type": row["document_type"]} ), "source_metadata": { "source_id": row.get("source_id"), "source_name": row.get("source_name"), "source_type": row["document_type"], }, "text_metadata": (None if content_type != ContentTypeEnum.TEXT else {"text_type": "document"}), } except Exception as inner_e: logger.exception("Failed to process row during metadata injection") raise inner_e rows.append(row) if update_required: docs = pd.DataFrame(rows) message.payload(docs) logger.debug("Metadata injection updated payload with %d rows", len(docs)) else: logger.debug("No metadata update was necessary during metadata injection") return message except Exception as e: new_message = f"on_data: Failed to process IngestControlMessage. Original error: {str(e)}" logger.exception(new_message) raise type(e)(new_message) from e
@register_module(MODULE_NAME, MODULE_NAMESPACE) def _metadata_injection(builder: mrc.Builder): validated_config = fetch_and_validate_module_config(builder, MetadataInjectorSchema) @traceable(MODULE_NAME) @nv_ingest_node_failure_context_manager( annotation_id=MODULE_NAME, raise_on_failure=validated_config.raise_on_failure, skip_processing_if_failed=True ) @unified_exception_handler def _on_data(message: IngestControlMessage) -> IngestControlMessage: return on_data(message) node = builder.make_node("metadata_injector", _on_data) builder.register_module_input("input", node) builder.register_module_output("output", node)