Source code for nv_ingest_api.internal.primitives.tracing.logging
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import inspect
import uuid
from datetime import datetime
from enum import Enum
from nv_ingest_api.internal.primitives.ingest_control_message import IngestControlMessage
[docs]
class TaskResultStatus(Enum):
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
[docs]
def annotate_cm(control_message: IngestControlMessage, source_id=None, **kwargs):
"""
Annotate a IngestControlMessage object with arbitrary metadata, a source ID, and a timestamp.
Each annotation will be uniquely identified by a UUID.
Parameters:
- control_message: The IngestControlMessage object to be annotated.
- source_id: A unique identifier for the source of the annotation. If None, uses the caller's __name__.
- **kwargs: Arbitrary key-value pairs to be included in the annotation.
"""
if source_id is None:
# Determine the __name__ of the parent caller's module
frame = inspect.currentframe()
caller_frame = inspect.getouterframes(frame)[2]
module = inspect.getmodule(caller_frame[0])
source_id = module.__name__ if module is not None else "UnknownModule"
# Ensure 'annotation_timestamp' is not overridden by kwargs
if "annotation_timestamp" in kwargs:
raise ValueError("'annotation_timestamp' is a reserved key and cannot be specified.")
message = kwargs.get("message")
annotation_key = f"annotation::{message}" if message else f"annotation::{uuid.uuid4()}"
annotation_timestamp = datetime.now()
try:
control_message.set_timestamp(annotation_key, annotation_timestamp)
except Exception as e:
print(f"Failed to set annotation timestamp: {e}")
# Construct the metadata key uniquely identified by a UUID.
metadata_key = f"annotation::{uuid.uuid4()}"
# Construct the metadata value with reserved 'annotation_timestamp', source_id, and any provided kwargs.
metadata_value = {
"source_id": source_id,
}
metadata_value.update(kwargs)
try:
# Attempt to set the annotated metadata on the IngestControlMessage object.
control_message.set_metadata(metadata_key, metadata_value)
except Exception as e:
# Handle any exceptions that occur when setting metadata.
print(f"Failed to annotate IngestControlMessage: {e}")
[docs]
def annotate_task_result(control_message, result, task_id, source_id=None, **kwargs):
"""
Annotate a IngestControlMessage object with the result of a task, identified by a task_id,
and an arbitrary number of additional key-value pairs. The result can be a TaskResultStatus
enum or a string that will be converted to the corresponding enum.
Parameters:
- control_message: The IngestControlMessage object to be annotated.
- result: The result of the task, either SUCCESS or FAILURE, as an enum or string.
- task_id: A unique identifier for the task.
- **kwargs: Arbitrary additional key-value pairs to be included in the annotation.
"""
# Convert result to TaskResultStatus enum if it's a string
if isinstance(result, str):
try:
result = TaskResultStatus[result.upper()]
except KeyError:
raise ValueError(
f"Invalid result string: {result}. Must be one of {[status.name for status in TaskResultStatus]}."
)
elif not isinstance(result, TaskResultStatus):
raise ValueError("result must be an instance of TaskResultStatus Enum or a valid result string.")
# Annotate the control message with task-related information, including the result and task_id.
annotate_cm(
control_message,
source_id=source_id,
task_result=result.value,
task_id=task_id,
**kwargs,
)