# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import inspect
import re
import typing
from functools import wraps
from nv_ingest.util.tracing.logging import TaskResultStatus
from nv_ingest.util.tracing.logging import annotate_task_result
from nv_ingest_api.primitives.ingest_control_message import IngestControlMessage
[docs]
def cm_ensure_payload_not_null(control_message: IngestControlMessage):
"""
Ensures that the payload of a IngestControlMessage is not None.
Parameters
----------
control_message : IngestControlMessage
The IngestControlMessage to check.
Raises
------
ValueError
If the payload is None.
"""
if control_message.payload() is None:
raise ValueError("Payload cannot be None")
[docs]
def cm_set_failure(control_message: IngestControlMessage, reason: str) -> IngestControlMessage:
"""
Sets the failure metadata on a IngestControlMessage.
Parameters
----------
control_message : IngestControlMessage
The IngestControlMessage to set the failure metadata on.
reason : str
The reason for the failure.
Returns
-------
control_message : IngestControlMessage
The modified IngestControlMessage with the failure metadata set.
"""
control_message.set_metadata("cm_failed", True)
control_message.set_metadata("cm_failed_reason", reason)
return control_message
[docs]
def nv_ingest_node_failure_context_manager(
annotation_id: str,
payload_can_be_empty: bool = False,
raise_on_failure: bool = False,
skip_processing_if_failed: bool = True,
forward_func=None,
) -> typing.Callable:
"""
A decorator that applies a default failure context manager around a function to manage
the execution and potential failure of operations involving IngestControlMessages.
Parameters
----------
annotation_id : str
A unique identifier used for annotating the task's result.
payload_can_be_empty : bool, optional
If False, the payload of the IngestControlMessage will be checked to ensure it's not null,
raising an exception if it is null. Defaults to False, enforcing payload presence.
raise_on_failure : bool, optional
If True, an exception is raised if the decorated function encounters an error.
Otherwise, the error is handled silently by annotating the IngestControlMessage. Defaults to False.
skip_processing_if_failed : bool, optional
If True, skips the processing of the decorated function if the control message has already
been marked as failed. If False, the function will be processed regardless of the failure
status of the IngestControlMessage. Defaults to True.
forward_func : callable, optional
A function to forward the IngestControlMessage if it has already been marked as failed.
Returns
-------
Callable
A decorator that wraps the given function with failure handling logic.
"""
def decorator(func):
@wraps(func)
def wrapper(control_message: IngestControlMessage, *args, **kwargs):
# Quick return if the IngestControlMessage has already failed
is_failed = control_message.get_metadata("cm_failed", False)
if not is_failed or not skip_processing_if_failed:
with CMNVIngestFailureContextManager(
control_message=control_message,
annotation_id=annotation_id,
raise_on_failure=raise_on_failure,
func_name=func.__name__,
) as ctx_mgr:
if not payload_can_be_empty:
cm_ensure_payload_not_null(control_message=control_message)
control_message = func(ctx_mgr.control_message, *args, **kwargs)
else:
if forward_func:
control_message = forward_func(control_message)
return control_message
return wrapper
return decorator
[docs]
def nv_ingest_source_failure_context_manager(
annotation_id: str,
payload_can_be_empty: bool = False,
raise_on_failure: bool = False,
) -> typing.Callable:
"""
A decorator that ensures any function's output is treated as a IngestControlMessage for annotation.
It applies a context manager to handle success and failure annotations based on the function's execution.
Parameters
----------
annotation_id : str
Unique identifier used for annotating the function's output.
payload_can_be_empty : bool, optional
Specifies if the function's output IngestControlMessage payload can be empty, default is False.
raise_on_failure : bool, optional
Determines if an exception should be raised upon function failure, default is False.
Returns
-------
Callable
A decorator that ensures function output is processed for success or failure annotation.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs) -> IngestControlMessage:
try:
result = func(*args, **kwargs)
if not isinstance(result, IngestControlMessage):
raise TypeError(f"{func.__name__} output is not a IngestControlMessage as expected.")
if not payload_can_be_empty and result.get_metadata("payload") is None:
raise ValueError(f"{func.__name__} IngestControlMessage payload cannot be null.")
# Success annotation.
annotate_task_result(result, result=TaskResultStatus.SUCCESS, task_id=annotation_id)
except Exception as e:
error_message = f"Error in {func.__name__}: {e}"
# Prepare a new IngestControlMessage for failure annotation if needed.
if "result" not in locals() or not isinstance(result, IngestControlMessage):
result = IngestControlMessage()
cm_set_failure(result, error_message)
annotate_task_result(
result,
result=TaskResultStatus.FAILURE,
task_id=annotation_id,
message=error_message,
)
if raise_on_failure:
raise
return result
return wrapper
return decorator
[docs]
class CMNVIngestFailureContextManager:
"""
Context manager for handling IngestControlMessage failures during processing, providing
a structured way to annotate and manage failures and successes.
Parameters
----------
control_message : IngestControlMessage
The IngestControlMessage instance to be managed.
annotation_id : str
The task's unique identifier for annotation purposes.
raise_on_failure : bool, optional
Determines whether to raise an exception upon failure. Defaults to False, which
means failures are annotated rather than raising exceptions.
func_name : str, optional
The name of the function being wrapped, used to annotate error messages uniformly.
If None, stack introspection is used to deduce a likely function name. Defaults to None.
Returns
-------
None
"""
def __init__(
self,
control_message: IngestControlMessage,
annotation_id: str,
raise_on_failure: bool = False,
func_name: str = None,
):
self.control_message = control_message
self.annotation_id = annotation_id
self.raise_on_failure = raise_on_failure
if func_name is not None:
self._func_name = func_name
else:
try:
# Use stack introspection to get a candidate function name.
stack = inspect.stack()
# Use the third frame as a heuristic; adjust if needed.
candidate = stack[2].function if len(stack) > 2 else "UnknownFunction"
# Remove any whitespace and limit the length to 50 characters.
candidate = re.sub(r"\s+", "", candidate)[:50]
self._func_name = candidate if candidate else "UnknownFunction"
except Exception:
self._func_name = "UnknownFunction"
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None: # An exception occurred
error_message = f"Error in {self._func_name}: {exc_value}"
if self.control_message is not None:
cm_set_failure(self.control_message, error_message)
annotate_task_result(
self.control_message,
result=TaskResultStatus.FAILURE,
task_id=self.annotation_id,
message=error_message,
)
# Propagate the exception if raise_on_failure is True; otherwise, suppress it.
if self.raise_on_failure:
return False
return True
annotate_task_result(
self.control_message,
result=TaskResultStatus.SUCCESS,
task_id=self.annotation_id,
)
return False