nv_ingest.framework.util.flow_control package#
Submodules#
nv_ingest.framework.util.flow_control.filter_by_task module#
- nv_ingest.framework.util.flow_control.filter_by_task.filter_by_task(
- required_tasks: List[str | Tuple[Any, ...]],
- forward_func: Callable[[Any], Any] | None = None,
Decorator that checks whether an IngestControlMessage contains any of the required tasks. Supports both synchronous and asynchronous functions as well as class methods (with ‘self’). If no required task is found, the original message is returned (or forward_func is called if provided).
- Parameters:
required_tasks (list[Union[str, Tuple[Any, ...]]]) – A list of required tasks. Each element is either a string representing a task name or a tuple/list where the first element is the task name and the remaining elements specify required task properties.
forward_func (Optional[Callable[[Any], Any]]) – A function to be called with the IngestControlMessage if no required task is found.
- Returns:
A decorator wrapping a function that expects an IngestControlMessage as one of its first arguments.
- Return type:
Callable
nv_ingest.framework.util.flow_control.udf_intercept module#
- class nv_ingest.framework.util.flow_control.udf_intercept.CachedUDF(
- function: callable,
- function_name: str,
- signature_validated: bool,
- created_at: float,
- last_used: float,
- use_count: int,
Bases:
objectCached UDF function with metadata
- created_at: float#
- function: callable#
- function_name: str#
- last_used: float#
- signature_validated: bool#
- use_count: int#
- class nv_ingest.framework.util.flow_control.udf_intercept.UDFCache(max_size: int = 128, ttl_seconds: int | None = 3600)[source]#
Bases:
objectLRU cache for compiled and validated UDF functions
- nv_ingest.framework.util.flow_control.udf_intercept.compile_and_validate_udf(
- udf_function_str: str,
- udf_function_name: str,
- task_num: int,
Compile and validate UDF function (extracted for caching)
- nv_ingest.framework.util.flow_control.udf_intercept.execute_targeted_udfs(
- control_message: IngestControlMessage,
- stage_name: str,
- directive: str,
Execute UDFs that target this stage with the given directive.
- nv_ingest.framework.util.flow_control.udf_intercept.get_udf_cache_stats() Dict[str, Any][source]#
Get UDF cache performance statistics
- nv_ingest.framework.util.flow_control.udf_intercept.remove_task_by_id(
- control_message: IngestControlMessage,
- task_id: str,
Remove a specific task by ID from the control message
- nv_ingest.framework.util.flow_control.udf_intercept.udf_intercept_hook(
- stage_name: str | None = None,
- enable_run_before: bool = True,
- enable_run_after: bool = True,
Decorator that executes UDFs targeted at this stage.
This decorator integrates with the existing UDF system, providing full UDF compilation, caching, and execution capabilities. UDFs can target specific stages using run_before or run_after directives.
- Parameters:
stage_name – Name of the stage (e.g., “image_dedup”, “text_extract”). If None, will attempt to use self.stage_name from the decorated method’s instance.
enable_run_before – Whether to execute UDFs with run_before=True (default: True)
enable_run_after – Whether to execute UDFs with run_after=True (default: True)
Examples
# Automatic stage name detection (recommended) @traceable(“image_deduplication”) @udf_intercept_hook() # Uses self.stage_name automatically @filter_by_task(required_tasks=[“dedup”]) def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:
return control_message
# Explicit stage name (fallback/override) @traceable(“data_sink”) @udf_intercept_hook(“data_sink”, enable_run_after=False) @filter_by_task(required_tasks=[“store”]) def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:
return control_message
# Only run_after UDFs (e.g., for source stages) @traceable(“data_source”) @udf_intercept_hook(enable_run_before=False) # Uses self.stage_name automatically def on_data(self, control_message: IngestControlMessage) -> IngestControlMessage:
return control_message
Module contents#
- nv_ingest.framework.util.flow_control.filter_by_task(
- required_tasks: List[str | Tuple[Any, ...]],
- forward_func: Callable[[Any], Any] | None = None,
Decorator that checks whether an IngestControlMessage contains any of the required tasks. Supports both synchronous and asynchronous functions as well as class methods (with ‘self’). If no required task is found, the original message is returned (or forward_func is called if provided).
- Parameters:
required_tasks (list[Union[str, Tuple[Any, ...]]]) – A list of required tasks. Each element is either a string representing a task name or a tuple/list where the first element is the task name and the remaining elements specify required task properties.
forward_func (Optional[Callable[[Any], Any]]) – A function to be called with the IngestControlMessage if no required task is found.
- Returns:
A decorator wrapping a function that expects an IngestControlMessage as one of its first arguments.
- Return type:
Callable