nv_ingest.framework.orchestration.ray.util.pipeline package#

Submodules#

nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller module#

class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.PIDController(
kp: float,
ki: float,
kd: float,
stage_cost_estimates: Dict[str, int],
target_queue_depth: int = 0,
window_size: int = 10,
penalty_factor: float = 0.0005,
error_boost_factor: float = 1.5,
)[source]#

Bases: object

Calculates initial replica adjustment proposals based on PID control logic.

This controller focuses on the core PID algorithm reacting to the error between the current state (queue depth) and the desired state (target depth), adjusted by an idle penalty. It tracks memory usage per replica to provide a dynamic cost estimate for the ResourceConstraintManager.

calculate_initial_proposals(
stage_metrics: Dict[str, Dict[str, Any]],
) Dict[str, StagePIDProposal][source]#

Calculates initial, unconstrained replica proposals for each stage.

Iterates through each stage, calculates its PID error and delta based on queue depth and target, and returns the initial proposals without considering global constraints. Includes dynamic cost estimates.

Parameters:

stage_metrics (Dict[str, Dict[str, Any]]) – Dictionary mapping stage names to their current metrics. Expected keys per stage: ‘replicas’, ‘queue_depth’. Optional: ‘memory_usage’, ‘target_queue_depth’, ‘processing’, ‘min_replicas’, ‘max_replicas’.

Returns:

Dictionary mapping stage names to their initial proposals, including current/proposed replicas, cost estimates, and original metrics.

Return type:

Dict[str, StagePIDProposal]

class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.ResourceConstraintManager(
max_replicas: int,
memory_threshold: int,
estimated_edge_cost_mb: int,
memory_safety_buffer_fraction: float,
)[source]#

Bases: object

Applies global resource constraints and safety checks to initial proposals.

Takes the initial replica proposals generated by the PIDController and adjusts them based on global limits (max replicas, available CPU cores based on affinity, memory budget with safety buffer), and ensures pipeline consistency (zero-replica safety). It allocates limited resources proportionally if multiple stages request scale-ups simultaneously.

If current global memory usage exceeds the effective limit, it aggressively scales down stages starting with the highest replica counts.

apply_constraints(
initial_proposals: Dict[str, StagePIDProposal],
global_in_flight: int,
current_global_memory_usage_mb: int,
num_edges: int,
) Dict[str, int][source]#

Applies all configured constraints to initial replica proposals. (Docstring from previous version is fine)

class nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller.StagePIDProposal(
name: str,
current_replicas: int,
proposed_replicas: int,
conservative_cost_estimate: float,
metrics: Dict[str, Any],
)[source]#

Bases: object

Holds the initial proposal from the PID controller for a single stage.

conservative_cost_estimate: float#
current_replicas: int#
metrics: Dict[str, Any]#
name: str#
proposed_replicas: int#

nv_ingest.framework.orchestration.ray.util.pipeline.pipeline_builders module#

nv_ingest.framework.orchestration.ray.util.pipeline.pipeline_runners module#

nv_ingest.framework.orchestration.ray.util.pipeline.stage_builders module#

Module contents#