Source code for nv_ingest.framework.orchestration.execution.options

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Data classes for pipeline execution configuration and options.

This module defines declarative data structures for configuring pipeline execution,
replacing imperative parameter passing with structured configuration objects.
"""

from dataclasses import dataclass
from typing import Optional, TextIO, Union

from nv_ingest.framework.orchestration.ray.primitives.ray_pipeline import (
    RayPipelineInterface,
    RayPipelineSubprocessInterface,
)


[docs] @dataclass class PipelineRuntimeOverrides: """ Runtime parameter overrides for pipeline configuration. These overrides are applied to the base pipeline configuration to customize runtime behavior without modifying the source config. Attributes ---------- disable_dynamic_scaling : Optional[bool] Override for dynamic scaling behavior. If provided, overrides the pipeline config's disable_dynamic_scaling setting. dynamic_memory_threshold : Optional[float] Override for memory threshold used in dynamic scaling decisions. Must be between 0.0 and 1.0 if provided. """ disable_dynamic_scaling: Optional[bool] = None dynamic_memory_threshold: Optional[float] = None def __post_init__(self): """Validate override values.""" if self.dynamic_memory_threshold is not None: if not (0.0 <= self.dynamic_memory_threshold <= 1.0): raise ValueError( f"dynamic_memory_threshold must be between 0.0 and 1.0, " f"got {self.dynamic_memory_threshold}" )
[docs] @dataclass class ExecutionOptions: """ Options controlling pipeline execution behavior. These options determine how the pipeline is executed (blocking vs non-blocking) and where output is directed for subprocess execution. Attributes ---------- block : bool If True, blocks until pipeline completes. If False, returns immediately with a control interface. stdout : Optional[TextIO] Stream for subprocess stdout redirection. Only used when run_in_subprocess=True. If None, redirected to /dev/null. stderr : Optional[TextIO] Stream for subprocess stderr redirection. Only used when run_in_subprocess=True. If None, redirected to /dev/null. """ block: bool = True stdout: Optional[TextIO] = None stderr: Optional[TextIO] = None
[docs] @dataclass class ExecutionResult: """ Result of pipeline execution containing interface and timing information. This class encapsulates the results of pipeline execution and provides methods to convert to the legacy return format for backward compatibility. Attributes ---------- interface : Union[RayPipelineInterface, RayPipelineSubprocessInterface, None] Pipeline control interface. None for blocking subprocess execution. elapsed_time : Optional[float] Total execution time in seconds. Only set for blocking execution. """ interface: Union[RayPipelineInterface, RayPipelineSubprocessInterface, None] elapsed_time: Optional[float] = None
[docs] def get_return_value(self) -> Union[RayPipelineInterface, float, RayPipelineSubprocessInterface]: """ Convert to legacy return format for backward compatibility. Returns ------- Union[RayPipelineInterface, float, RayPipelineSubprocessInterface] - If blocking execution: returns elapsed time (float) - If non-blocking execution: returns pipeline interface """ if self.elapsed_time is not None: return self.elapsed_time elif self.interface is not None: return self.interface else: # This should not happen in normal execution raise RuntimeError("ExecutionResult has neither interface nor elapsed_time")