Source code for nv_ingest.framework.orchestration.morpheus.stages.extractors.chart_extraction_stage
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import functools
import logging
from typing import Any
from typing import Dict
from morpheus.config import Config
from nv_ingest.framework.orchestration.morpheus.stages.meta.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest_api.internal.extract.image.chart_extractor import extract_chart_data_from_image_internal
from nv_ingest_api.internal.schemas.extract.extract_chart_schema import ChartExtractorSchema
logger = logging.getLogger(f"morpheus.{__name__}")
[docs]
def generate_chart_extractor_stage(
c: Config,
extractor_config: Dict[str, Any],
task: str = "chart_data_extract",
task_desc: str = "chart_data_extraction",
pe_count: int = 1,
):
"""
Generates a multiprocessing stage to perform chart data extraction from PDF content.
Parameters
----------
c : Config
Morpheus global configuration object.
extractor_config : Dict[str, Any]
Configuration parameters for the chart content extractor, passed as a dictionary
validated against the `ChartExtractorSchema`.
task : str, optional
The task name for the stage worker function, defining the specific chart extraction process.
Default is "chart_data_extract".
task_desc : str, optional
A descriptor used for latency tracing and logging during chart extraction.
Default is "chart_data_extraction".
pe_count : int, optional
The number of process engines to use for chart data extraction. This value controls
how many worker processes will run concurrently. Default is 1.
Returns
-------
MultiProcessingBaseStage
A configured Morpheus stage with an applied worker function that handles chart data extraction
from PDF content.
"""
try:
validated_config = ChartExtractorSchema(**extractor_config)
_wrapped_process_fn = functools.partial(
extract_chart_data_from_image_internal, extraction_config=validated_config
)
return MultiProcessingBaseStage(
c=c,
pe_count=pe_count,
task=task,
task_desc=task_desc,
process_fn=_wrapped_process_fn,
)
except Exception as e:
err_msg = f"generate_chart_extractor_stage: Error generating table extractor stage. Original error: {e}"
logger.error(err_msg, exc_info=True)
raise type(e)(err_msg) from e