nv_ingest.framework.orchestration.ray.util.pipeline package#
Submodules#
nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller module#
- class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.PIDController(
- kp: float,
- ki: float,
- kd: float,
- target_queue_depth: int = 0,
- penalty_factor: float = 0.0005,
- error_boost_factor: float = 1.5,
Bases:
objectCalculates initial replica adjustment proposals based on PID control logic.
This controller focuses on the core PID algorithm reacting to the error between the current state (queue depth) and the desired state (target depth), adjusted by an idle penalty. It tracks memory usage per replica to provide a dynamic cost estimate for the ResourceConstraintManager.
- calculate_initial_proposals(
- stage_metrics: Dict[str, Dict[str, Any]],
Calculates initial, unconstrained replica proposals for each stage.
Iterates through each stage, calculates its PID error and delta based on queue depth and target, and returns the initial proposals without considering global constraints. Includes dynamic cost estimates.
- Parameters:
stage_metrics (Dict[str, Dict[str, Any]]) – Dictionary mapping stage names to their current metrics. Expected keys per stage: ‘replicas’, ‘queue_depth’, ‘ema_memory_per_replica’. Optional: ‘target_queue_depth’, ‘processing’, ‘min_replicas’, ‘max_replicas’.
- Returns:
Dictionary mapping stage names to their initial proposals, including current/proposed replicas, cost estimates, and original metrics.
- Return type:
Dict[str, StagePIDProposal]
- class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.ResourceConstraintManager(
- max_replicas: int,
- memory_threshold: int,
- memory_safety_buffer_fraction: float,
Bases:
objectApplies global resource constraints and safety checks to initial proposals.
Takes the initial replica proposals generated by the PIDController and adjusts them based on global limits (max replicas, available CPU cores based on affinity, memory budget with safety buffer), and ensures pipeline consistency (zero-replica safety). It allocates limited resources proportionally if multiple stages request scale-ups simultaneously.
If current global memory usage exceeds the effective limit, it aggressively scales down stages starting with the highest replica counts.
- apply_constraints(
- initial_proposals: Dict[str, StagePIDProposal],
- global_in_flight: int,
- current_global_memory_usage_mb: int,
- num_edges: int,
Applies all configured constraints to initial replica proposals. (Docstring from previous version is fine)
- class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.StagePIDProposal(
- name: str,
- current_replicas: int,
- proposed_replicas: int,
- conservative_cost_estimate: float,
- metrics: Dict[str, Any],
Bases:
objectHolds the initial proposal from the PID controller for a single stage.
- conservative_cost_estimate: float#
- current_replicas: int#
- metrics: Dict[str, Any]#
- name: str#
- proposed_replicas: int#
nv_ingest.framework.orchestration.ray.util.pipeline.pipeline_runners module#
- nv_ingest.framework.orchestration.ray.util.pipeline.pipeline_runners.run_pipeline(
- pipeline_config: PipelineConfigSchema | None = None,
- block: bool = True,
- disable_dynamic_scaling: bool | None = None,
- dynamic_memory_threshold: float | None = None,
- run_in_subprocess: bool = False,
- stdout: TextIO | None = None,
- stderr: TextIO | None = None,
- libmode: bool = True,
- quiet: bool | None = None,
Launch and manage a pipeline using configuration.
This function is the primary entry point for executing a Ray pipeline, either within the current process or in a separate Python subprocess. It supports synchronous blocking execution or non-blocking lifecycle management, and allows redirection of output to specified file-like objects.
- Parameters:
pipeline_config (Optional[PipelineConfigSchema], default=None) – The validated configuration object used to construct and launch the pipeline. If None and libmode is True, loads the default libmode pipeline.
block (bool, default=True) – If True, blocks until the pipeline completes. If False, returns an interface to control the pipeline externally.
disable_dynamic_scaling (Optional[bool], default=None) – If provided, overrides the disable_dynamic_scaling setting from the pipeline config.
dynamic_memory_threshold (Optional[float], default=None) – If provided, overrides the dynamic_memory_threshold setting from the pipeline config.
run_in_subprocess (bool, default=False) – If True, launches the pipeline in a separate Python subprocess using multiprocessing.Process. If False, runs the pipeline in the current process.
stdout (Optional[TextIO], default=None) – Optional file-like stream to which subprocess stdout should be redirected. If None, stdout is redirected to /dev/null.
stderr (Optional[TextIO], default=None) – Optional file-like stream to which subprocess stderr should be redirected. If None, stderr is redirected to /dev/null.
libmode (bool, default=True) – If True and pipeline_config is None, loads the default libmode pipeline configuration. If False, requires pipeline_config to be provided.
quiet (Optional[bool], default=None) – If True, configures logging for minimal output (PRODUCTION preset, suppresses INFO-level startup messages). If None, defaults to True when libmode=True. Set to False to see verbose startup logs even in library mode.
- Returns:
If run in-process with block=True: returns elapsed time in seconds (float).
If run in-process with block=False: returns a RayPipelineInterface.
If run in subprocess with block=False: returns a RayPipelineSubprocessInterface.
If run in subprocess with block=True: returns 0.0.
- Return type:
Union[RayPipelineInterface, float, RayPipelineSubprocessInterface]
- Raises:
ValueError – If pipeline_config is None and libmode is False.
RuntimeError – If the subprocess fails to start or exits with an error.
Exception – Any other exceptions raised during pipeline launch or configuration.
nv_ingest.framework.orchestration.ray.util.pipeline.tools module#
- nv_ingest.framework.orchestration.ray.util.pipeline.tools.wrap_callable_as_stage(
- fn: Callable[[object, BaseModel], object],
- schema_type: Type[BaseModel],
- *,
- required_tasks: List[str] | None = None,
- trace_id: str | None = None,
Factory to wrap a user-supplied function into a Ray actor, returning a proxy for unique, isolated dynamic actor creation.
- Parameters:
fn (Callable[[IngestControlMessage, BaseModel], IngestControlMessage]) – The processing function to be wrapped in the Ray actor.
schema_type (Type[BaseModel]) – Pydantic schema used to validate and pass the stage config.
required_tasks (Optional[List[str]], optional) – Task names this stage should filter on. If None, no filtering is applied.
trace_id (Optional[str], optional) – Optional name for trace annotation; defaults to the function name.
- Returns:
StageProxy – A factory-like proxy exposing .remote() and .options() for Ray-idiomatic actor creation. Direct instantiation or class method use is not supported.
- Return type:
object
Notes
Each call to .remote() or .options() generates a new, dynamically created class (using type()), ensuring Ray treats each as a unique actor type and preventing class/actor name collisions or registry issues. This is essential when running dynamic or parallel pipelines and tests.
Only .remote(config) and .options(…) (chained with .remote(config)) are supported. All other class/actor patterns will raise NotImplementedError.