nv_ingest.framework.util.flow_control package#

Submodules#

nv_ingest.framework.util.flow_control.filter_by_task module#

nv_ingest.framework.util.flow_control.filter_by_task.filter_by_task(
required_tasks: List[str | Tuple[Any, ...]],
forward_func: Callable[[Any], Any] | None = None,
) Callable[source]#

Decorator that checks whether an IngestControlMessage contains any of the required tasks. Supports both synchronous and asynchronous functions as well as class methods (with ‘self’). If no required task is found, the original message is returned (or forward_func is called if provided).

Parameters:
  • required_tasks (list[Union[str, Tuple[Any, ...]]]) – A list of required tasks. Each element is either a string representing a task name or a tuple/list where the first element is the task name and the remaining elements specify required task properties.

  • forward_func (Optional[Callable[[Any], Any]]) – A function to be called with the IngestControlMessage if no required task is found.

Returns:

A decorator wrapping a function that expects an IngestControlMessage as one of its first arguments.

Return type:

Callable

nv_ingest.framework.util.flow_control.udf_intercept module#

class nv_ingest.framework.util.flow_control.udf_intercept.CachedUDF(
function: callable,
function_name: str,
signature_validated: bool,
created_at: float,
last_used: float,
use_count: int,
)[source]#

Bases: object

Cached UDF function with metadata

created_at: float#
function: callable#
function_name: str#
last_used: float#
signature_validated: bool#
use_count: int#
class nv_ingest.framework.util.flow_control.udf_intercept.UDFCache(max_size: int = 128, ttl_seconds: int | None = 3600)[source]#

Bases: object

LRU cache for compiled and validated UDF functions

get(
udf_function_str: str,
udf_function_name: str,
) CachedUDF | None[source]#

Get cached UDF function if available

get_stats() Dict[str, Any][source]#

Get cache statistics

put(
udf_function_str: str,
udf_function_name: str,
function: callable,
signature_validated: bool = True,
) str[source]#

Cache a compiled and validated UDF function

nv_ingest.framework.util.flow_control.udf_intercept.compile_and_validate_udf(
udf_function_str: str,
udf_function_name: str,
task_num: int,
) callable[source]#

Compile and validate UDF function (extracted for caching)

nv_ingest.framework.util.flow_control.udf_intercept.execute_targeted_udfs(
control_message: IngestControlMessage,
stage_name: str,
directive: str,
) IngestControlMessage[source]#

Execute UDFs that target this stage with the given directive.

nv_ingest.framework.util.flow_control.udf_intercept.get_udf_cache_stats() Dict[str, Any][source]#

Get UDF cache performance statistics

nv_ingest.framework.util.flow_control.udf_intercept.remove_task_by_id(
control_message: IngestControlMessage,
task_id: str,
) IngestControlMessage[source]#

Remove a specific task by ID from the control message

nv_ingest.framework.util.flow_control.udf_intercept.udf_intercept_hook(
stage_name: str | None = None,
enable_run_before: bool = True,
enable_run_after: bool = True,
)[source]#

Decorator that executes UDFs targeted at this stage.

This decorator integrates with the existing UDF system, providing full UDF compilation, caching, and execution capabilities. UDFs can target specific stages using run_before or run_after directives.

Parameters:
  • stage_name – Name of the stage (e.g., “image_dedup”, “text_extract”). If None, will attempt to use self.stage_name from the decorated method’s instance.

  • enable_run_before – Whether to execute UDFs with run_before=True (default: True)

  • enable_run_after – Whether to execute UDFs with run_after=True (default: True)

Examples

# Automatic stage name detection (recommended) @traceable(“image_deduplication”) @udf_intercept_hook() # Uses self.stage_name automatically @filter_by_task(required_tasks=[“dedup”]) def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:

return control_message

# Explicit stage name (fallback/override) @traceable(“data_sink”) @udf_intercept_hook(“data_sink”, enable_run_after=False) @filter_by_task(required_tasks=[“store”]) def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:

return control_message

# Only run_after UDFs (e.g., for source stages) @traceable(“data_source”) @udf_intercept_hook(enable_run_before=False) # Uses self.stage_name automatically def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:

return control_message

Module contents#

nv_ingest.framework.util.flow_control.filter_by_task(
required_tasks: List[str | Tuple[Any, ...]],
forward_func: Callable[[Any], Any] | None = None,
) Callable[source]#

Decorator that checks whether an IngestControlMessage contains any of the required tasks. Supports both synchronous and asynchronous functions as well as class methods (with ‘self’). If no required task is found, the original message is returned (or forward_func is called if provided).

Parameters:
  • required_tasks (list[Union[str, Tuple[Any, ...]]]) – A list of required tasks. Each element is either a string representing a task name or a tuple/list where the first element is the task name and the remaining elements specify required task properties.

  • forward_func (Optional[Callable[[Any], Any]]) – A function to be called with the IngestControlMessage if no required task is found.

Returns:

A decorator wrapping a function that expects an IngestControlMessage as one of its first arguments.

Return type:

Callable