Source code for nv_ingest.framework.orchestration.morpheus.stages.extractors.pdf_extractor_stage
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import functools
import logging
from typing import Any, Optional, List, Tuple
from typing import Dict
import pandas as pd
from nv_ingest.framework.orchestration.morpheus.stages.meta.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest_api.internal.extract.pdf.pdf_extractor import extract_primitives_from_pdf_internal
from nv_ingest_api.internal.schemas.extract.extract_pdf_schema import PDFExtractorSchema
logger = logging.getLogger(f"morpheus.{__name__}")
def _inject_validated_config(
df_extraction_ledger: pd.DataFrame,
task_config: Dict,
execution_trace_log: Optional[List[Any]] = None,
validated_config: Any = None,
) -> Tuple[pd.DataFrame, Dict]:
"""
Helper function that injects the validated_config into the config dictionary and
calls extract_primitives_from_pdf.
Parameters
----------
df_payload : pd.DataFrame
A DataFrame containing PDF documents.
task_config : dict
A dictionary of configuration parameters. Expected to include 'task_props'.
execution_trace_log : list, optional
Optional list for trace information.
validated_config : Any, optional
The validated configuration to be injected.
Returns
-------
Tuple[pd.DataFrame, dict]
The result from extract_primitives_from_pdf.
"""
return extract_primitives_from_pdf_internal(
df_extraction_ledger=df_extraction_ledger,
task_config=task_config,
extractor_config=validated_config,
execution_trace_log=execution_trace_log,
)
[docs]
def generate_pdf_extractor_stage(
c: Any,
extractor_config: Dict[str, Any],
task: str = "extract",
task_desc: str = "pdf_content_extractor",
pe_count: int = 24,
) -> Any:
"""
Generate a multiprocessing stage for PDF extraction.
This function validates the extractor configuration, creates a partial function
wrapper to inject the validated configuration into the config dict, and returns
a MultiProcessingBaseStage for parallel PDF extraction.
Parameters
----------
c : Any
The global configuration object for the pipeline.
extractor_config : dict
A dictionary containing configuration parameters for the PDF extractor.
task : str, optional
The name of the extraction task. Defaults to "extract".
task_desc : str, optional
A descriptor for the task used in latency tracing. Defaults to "pdf_content_extractor".
pe_count : int, optional
The number of processing engines to use for extraction. Defaults to 24.
Returns
-------
Any
A MultiProcessingBaseStage object configured for PDF extraction.
Raises
------
Exception
If an error occurs during the creation of the PDF extractor stage.
"""
try:
validated_extractor_config = PDFExtractorSchema(**extractor_config)
wrapped_process_fn = functools.partial(_inject_validated_config, validated_config=validated_extractor_config)
return MultiProcessingBaseStage(
c=c, pe_count=pe_count, task=task, task_desc=task_desc, process_fn=wrapped_process_fn, document_type="pdf"
)
except Exception as e:
err_msg = f"generate_pdf_extractor_stage: Error generating PDF extractor stage: {e}"
logger.error(err_msg, exc_info=True)
raise type(e)(err_msg) from e