Source code for nv_ingest_api.internal.extract.pdf.engines.llama

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import io
import logging
import time
from typing import Any, Optional
from typing import Dict
from typing import List

import aiohttp

from nv_ingest_api.internal.enums.common import ContentTypeEnum

DEFAULT_RESULT_TYPE = "text"
DEFAULT_FILE_NAME = "_.pdf"
DEFAULT_CHECK_INTERVAL_SECONDS = 1
DEFAULT_MAX_TIMEOUT_SECONDS = 2_000

logger = logging.getLogger(__name__)


[docs] def llama_parse_extractor( pdf_stream: io.BytesIO, extract_text: bool, extract_images: bool, extract_infographics: bool, extract_tables: bool, extractor_config: dict, execution_trace_log: Optional[List[Any]] = None, ) -> List[Dict[ContentTypeEnum, Dict[str, Any]]]: """ Helper function to use LlamaParse API to extract text from a bytestream PDF. Parameters ---------- pdf_stream : io.BytesIO A bytestream PDF. extract_text : bool Specifies whether to extract text. extract_images : bool Specifies whether to extract images. extract_tables : bool Specifies whether to extract tables. extract_infographics : bool Specifies whether to extract infographics. extractor_config : dict A dictionary containing additional extraction parameters including: - api_key: API key for LlamaParse. - result_type: Type of result to extract (default provided). - file_name: Name of the file (default provided). - check_interval: Interval for checking status (default provided). - max_timeout: Maximum timeout in seconds (default provided). - row_data: Row data for additional metadata. - metadata_column: Column name to extract metadata (default "metadata"). execution_trace_log : optional Trace information for debugging purposes. Returns ------- List[Dict[ContentTypeEnum, Dict[str, Any]]]: A list of extracted data. Each item is a dictionary where the key is a ContentTypeEnum and the value is a dictionary containing content and metadata. Raises ------ ValueError If extractor_config is not a dict or required parameters are missing. """ _ = execution_trace_log # Unused variable logger.debug("Extracting PDF with LlamaParse backend.") # Validate extractor_config. if not isinstance(extractor_config, dict): raise ValueError("extractor_config must be a dictionary.") api_key = extractor_config.get("llama_api_key") if not api_key: raise ValueError("LLAMA_CLOUD_API_KEY is required in extractor_config.") result_type = extractor_config.get("result_type", DEFAULT_RESULT_TYPE) file_name = extractor_config.get("file_name", DEFAULT_FILE_NAME) check_interval = extractor_config.get("check_interval", DEFAULT_CHECK_INTERVAL_SECONDS) max_timeout = extractor_config.get("max_timeout", DEFAULT_MAX_TIMEOUT_SECONDS) row_data = extractor_config.get("row_data") if row_data is None: raise ValueError("Missing 'row_data' in extractor_config.") metadata_column = extractor_config.get("metadata_column", "metadata") if hasattr(row_data, "index"): metadata = row_data[metadata_column] if metadata_column in row_data.index else {} else: metadata = row_data.get(metadata_column, {}) extracted_data = [] if extract_text: # TODO: As of Feb 2024, LlamaParse returns multi-page documents as one # long text. See if we can break it into pages or if LlamaParse adds # support for extracting each page. text = asyncio.run( async_llama_parse( pdf_stream, api_key, file_name=file_name, result_type=result_type, check_interval_seconds=check_interval, max_timeout_seconds=max_timeout, ) ) text_metadata = metadata.copy() text_metadata.update( { "content": text, "metadata": { "document_type": ContentTypeEnum[result_type], }, } ) payload = { ContentTypeEnum[result_type]: text_metadata, } extracted_data.append(payload) # TODO: LlamaParse extracts tables, but we have to extract the tables # ourselves from text/markdown. if extract_tables: # Table extraction logic goes here. pass # LlamaParse does not support image extraction as of Feb 2024. if extract_images: # Image extraction logic goes here. pass # Infographics extraction is currently not supported by LlamaParse. if extract_infographics: logger.debug("Infographics extraction requested, but not supported by LlamaParse.") return extracted_data
[docs] async def async_llama_parse( pdf_stream: io.BytesIO, api_key: str, file_name: str = DEFAULT_FILE_NAME, result_type: str = DEFAULT_RESULT_TYPE, check_interval_seconds: int = DEFAULT_CHECK_INTERVAL_SECONDS, max_timeout_seconds: int = DEFAULT_MAX_TIMEOUT_SECONDS, ) -> str: """Uses the LlamaParse API to extract text from bytestream PDF. Parameters ---------- pdf_stream : io.BytesIO A bytestream PDF. api_key: str API key from https://cloud.llamaindex.ai. file_name: str Name of the PDF file. result_type: str The result type for the parser. One of `text` or `markdown`. check_interval_seconds: int The interval in seconds to check if the parsing is done. max_timeout_seconds: int The maximum timeout in seconds to wait for the parsing to finish. Returns ------- str A string of extracted text. """ base_url = "https://api.cloud.llamaindex.ai/api/parsing" headers = {"Authorization": f"Bearer {api_key}"} mime_type = "application/pdf" try: data = aiohttp.FormData() data.add_field( "file", pdf_stream, filename=file_name, content_type=mime_type, ) upload_url = f"{base_url}/upload" async with aiohttp.ClientSession() as session: async with session.post( upload_url, data=data, headers=headers, ) as response: response_json = await response.json() job_id = response_json["id"] logger.debug("Started parsing the file under job_id %s" % job_id) result_url = f"{base_url}/job/{job_id}/result/{result_type}" start = time.time() while True: await asyncio.sleep(check_interval_seconds) result = await session.get(result_url, headers=headers) if result.status == 404: end = time.time() if end - start > max_timeout_seconds: raise Exception("Timeout while parsing PDF.") continue result_json = await result.json() if result.status == 400: detail = result_json.get("detail", "Unknown error") raise Exception(f"Failed to parse the PDF file: {detail}") text = result_json[result_type] return text except Exception as e: logger.error("Error while parsing the PDF file: ", e) return ""