Source code for nv_ingest_client.util.processing

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import concurrent
import json
import logging
from typing import Any, Tuple
from typing import Dict
from typing import Optional

from nv_ingest_client.util.util import check_ingest_result

logger = logging.getLogger(__name__)


[docs] class IngestJobFailure(Exception): """Custom exception to handle failed job ingestion results.""" def __init__(self, message: str, description: str, annotations: Dict[str, Any]): super().__init__(message) self.description = description self.annotations = annotations
[docs] def handle_future_result( future: concurrent.futures.Future, timeout: Optional[int] = None, ) -> Tuple[Dict[str, Any], str]: """ Handle the result of a completed future job and process annotations. This function processes the result of a future, extracts annotations (if any), logs them, and checks the validity of the ingest result. If the result indicates a failure, a `RuntimeError` is raised with a description of the failure. Parameters ---------- future : concurrent.futures.Future A future object representing an asynchronous job. The result of this job will be processed once it completes. timeout : Optional[int], default=None Maximum time to wait for the future result before timing out. Returns ------- Tuple[Dict[str, Any], str] - The result of the job as a dictionary, after processing and validation. - The trace_id returned by the submission endpoint Raises ------ IngestJobFailure If the job result is invalid, this exception is raised with the failure description and the full result for further inspection. Exception For all other unexpected errors. Notes ----- - The `future.result()` is assumed to return a tuple where the first element is the actual result (as a dictionary), and the second element (if present) can be ignored. - Annotations in the result (if any) are logged for debugging purposes. - The `check_ingest_result` function (assumed to be defined elsewhere) is used to validate the result. If the result is invalid, a `RuntimeError` is raised. Examples -------- Suppose we have a future object representing a job, a dictionary of futures to job IDs, and a directory for saving results: >>> future = concurrent.futures.Future() >>> result, trace_id = handle_future_result(future, timeout=60) In this example, the function processes the completed job and returns the result dictionary. If the job fails, it raises a `RuntimeError`. See Also -------- check_ingest_result : Function to validate the result of the job. """ try: result, _, trace_id = future.result(timeout=timeout)[0] if ("annotations" in result) and result["annotations"]: annotations = result["annotations"] for key, value in annotations.items(): logger.debug(f"Annotation: {key} -> {json.dumps(value, indent=2)}") failed, description = check_ingest_result(result) if failed: raise IngestJobFailure(f"Ingest job failed: {description}", description, result.get("annotations")) except Exception as e: logger.debug(f"Error processing future result: {e}") raise e return (result, trace_id)