Source code for nv_ingest.util.tracing.tagging
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import functools
import inspect
import string
from datetime import datetime
[docs]
def traceable(trace_name=None):
"""
A decorator that adds entry and exit trace timestamps to a IngestControlMessage's metadata
based on the presence of a 'config::add_trace_tagging' flag.
This decorator checks if the 'config::add_trace_tagging' flag is set to True in the
message's metadata. If so, it records the entry and exit timestamps of the function
execution, using either a provided custom trace name or the function's name by default.
Parameters
----------
trace_name : str, optional
A custom name for the trace entries in the message metadata. If not provided, the
function's name is used by default.
Returns
-------
decorator_trace_tagging : Callable
A wrapper function that decorates the target function to implement trace tagging.
Notes
-----
The decorated function must accept a IngestControlMessage object as its first argument. The
IngestControlMessage object must implement `has_metadata`, `get_metadata`, and `set_metadata`
methods used by the decorator to check for the trace tagging flag and to add trace metadata.
The trace metadata added by the decorator includes two entries:
- 'trace::entry::<trace_name>': The monotonic timestamp marking the function's entry.
- 'trace::exit::<trace_name>': The monotonic timestamp marking the function's exit.
Example
-------
Applying the decorator without a custom trace name:
>>> @traceable()
... def process_message(message):
... pass
Applying the decorator with a custom trace name:
>>> @traceable(custom_trace_name="CustomTraceName")
... def process_message(message):
... pass
In both examples, `process_message` will have entry and exit timestamps added to the
IngestControlMessage's metadata if 'config::add_trace_tagging' is True.
"""
def decorator_trace_tagging(func):
@functools.wraps(func)
def wrapper_trace_tagging(*args, **kwargs):
# Assuming the first argument is always the message
ts_fetched = datetime.now()
message = args[0]
do_trace_tagging = (message.has_metadata("config::add_trace_tagging") is True) and (
message.get_metadata("config::add_trace_tagging") is True
)
trace_prefix = trace_name if trace_name else func.__name__
if do_trace_tagging:
ts_send = message.get_timestamp("latency::ts_send")
ts_entry = datetime.now()
message.set_timestamp(f"trace::entry::{trace_prefix}", ts_entry)
if ts_send:
message.set_timestamp(f"trace::entry::{trace_prefix}_channel_in", ts_send)
message.set_timestamp(f"trace::exit::{trace_prefix}_channel_in", ts_fetched)
# Call the decorated function
result = func(*args, **kwargs)
if do_trace_tagging:
ts_exit = datetime.now()
message.set_timestamp(f"trace::exit::{trace_prefix}", ts_exit)
message.set_timestamp("latency::ts_send", ts_exit)
return result
return wrapper_trace_tagging
return decorator_trace_tagging
[docs]
def traceable_func(trace_name=None, dedupe=True):
"""
A decorator that injects trace information for tracking the execution of a function.
It logs the entry and exit timestamps of the function in a `trace_info` dictionary,
which can be used for performance monitoring or debugging purposes.
Parameters
----------
trace_name : str, optional
An optional string used as the prefix for the trace log entries. If not provided,
the decorated function's name is used. The string can include placeholders (e.g.,
"pdf_extractor::{model_name}") that will be dynamically replaced with matching
function argument values.
dedupe : bool, optional
If True, ensures that the trace entry and exit keys are unique by appending an index
(e.g., `_0`, `_1`) to the keys if duplicate entries are detected. Default is True.
Returns
-------
function
A wrapped function that injects trace information before and after the function's
execution.
Notes
-----
- If `trace_info` is not provided in the keyword arguments, a new dictionary is created
and used for storing trace entries.
- If `trace_name` contains format placeholders, the decorator attempts to populate them
with matching argument values from the decorated function.
- The trace information is logged in the format:
- `trace::entry::{trace_name}` for the entry timestamp.
- `trace::exit::{trace_name}` for the exit timestamp.
- If `dedupe` is True, the trace keys will be appended with an index to avoid
overwriting existing entries.
Example
-------
>>> @traceable_func(trace_name="pdf_extractor::{model_name}")
>>> def extract_pdf(model_name):
... pass
>>> trace_info = {}
>>> extract_pdf("my_model", trace_info=trace_info)
In this example, `model_name` is dynamically replaced in the trace_name, and the
trace information is logged with unique keys if deduplication is enabled.
"""
def decorator_inject_trace_info(func):
@functools.wraps(func)
def wrapper_inject_trace_info(*args, **kwargs):
trace_info = kwargs.pop("trace_info", None)
if trace_info is None:
trace_info = {}
trace_prefix = trace_name if trace_name else func.__name__
arg_names = list(inspect.signature(func).parameters)
args_name_to_val = dict(zip(arg_names, args))
# If `trace_name` is a formattable string, e.g., "pdf_extractor::{model_name}",
# search `args` and `kwargs` to replace the placeholder.
placeholders = [x[1] for x in string.Formatter().parse(trace_name) if x[1] is not None]
if placeholders:
format_kwargs = {}
for name in placeholders:
if name in args_name_to_val:
arg_val = args_name_to_val[name]
elif name in kwargs:
arg_val = kwargs.get(name)
else:
arg_val = name
format_kwargs[name] = arg_val
trace_prefix = trace_prefix.format(**format_kwargs)
trace_entry_key = f"trace::entry::{trace_prefix}"
trace_exit_key = f"trace::exit::{trace_prefix}"
ts_entry = datetime.now()
if dedupe:
trace_entry_key += "_{}"
trace_exit_key += "_{}"
i = 0
while (trace_entry_key.format(i) in trace_info) or (trace_exit_key.format(i) in trace_info):
i += 1
trace_entry_key = trace_entry_key.format(i)
trace_exit_key = trace_exit_key.format(i)
trace_info[trace_entry_key] = ts_entry
# Call the decorated function
result = func(*args, **kwargs)
ts_exit = datetime.now()
trace_info[trace_exit_key] = ts_exit
return result
return wrapper_inject_trace_info
return decorator_inject_trace_info