Source code for nv_ingest.framework.orchestration.ray.util.pipeline.pid_controller

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging
import math
from dataclasses import dataclass

import numpy as np
from collections import deque
from typing import Dict, Any, Deque, List, Tuple, Optional

from nv_ingest_api.util.system.hardware_info import SystemResourceProbe

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Constants ---
DEFAULT_STAGE_COST_MB = 5000.0  # Fallback memory cost


[docs] @dataclass class StagePIDProposal: """Holds the initial proposal from the PID controller for a single stage.""" name: str current_replicas: int proposed_replicas: int # Initial proposal based on PID / stage rate limit # Conservative cost estimate (max(dynamic_avg, static)) used for projections conservative_cost_estimate: float metrics: Dict[str, Any] # Original metrics for context
[docs] class PIDController: """ Calculates initial replica adjustment proposals based on PID control logic. This controller focuses on the core PID algorithm reacting to the error between the current state (queue depth) and the desired state (target depth), adjusted by an idle penalty. It tracks memory usage per replica to provide a dynamic cost estimate for the ResourceConstraintManager. """ def __init__( self, kp: float, ki: float, kd: float, # Currently unused in delta calculation stage_cost_estimates: Dict[str, int], # Static estimates (MB) target_queue_depth: int = 0, window_size: int = 10, penalty_factor: float = 0.0005, error_boost_factor: float = 1.5, ): """ Initializes the PID controller. Parameters ---------- kp : float Proportional gain. Reacts to the current error magnitude. ki : float Integral gain. Accumulates past errors to eliminate steady-state offsets. kd : float Derivative gain. Reacts to the rate of change of the error. (Currently set to 0 in internal calculations). stage_cost_estimates : Dict[str, int] Static estimated memory cost (in MB) per replica for each stage. Used as a fallback and minimum for dynamic estimates. target_queue_depth : int, optional Default target queue depth for stages if not specified in metrics, by default 0. The PID loop tries to drive the queue depth towards this value. window_size : int, optional Number of recent samples used for dynamic memory cost estimation per replica, by default 10. penalty_factor : float, optional Multiplier applied to the number of consecutive idle cycles for a stage. The resulting penalty effectively lowers the target queue depth for idle stages, encouraging scale-down, by default 0.5. error_boost_factor : float, optional Factor to multiply the raw PID delta when the error is positive (queue > target), potentially speeding up scale-up response, by default 1.5. """ self.kp = kp self.ki = ki self.kd = 0.0 # Explicitly disable derivative term for now self.target_queue_depth = target_queue_depth self.error_boost_factor = error_boost_factor # Per-Stage State self.stage_cost_estimates = { name: float(max(cost, 1.0)) for name, cost in stage_cost_estimates.items() # Ensure float and min 1MB } self.integral_error: Dict[str, float] = {} self.prev_error: Dict[str, float] = {} self.memory_history: Dict[str, Deque[float]] = {} # Per-replica memory history (MB) self.idle_cycles: Dict[str, int] = {} # Per-Stage Config self.window_size = window_size self.penalty_factor = penalty_factor # --- Private Methods --- def _initialize_stage_state(self, stage: str) -> None: """Initializes controller state variables for a newly seen stage.""" if stage not in self.integral_error: logger.debug(f"[PID-{stage}] Initializing state.") self.integral_error[stage] = 0.0 self.prev_error[stage] = 0.0 self.memory_history[stage] = deque(maxlen=self.window_size) self.idle_cycles[stage] = 0 # Ensure static cost estimate exists, provide default if missing if stage not in self.stage_cost_estimates: logger.warning(f"[PID-{stage}] Missing static cost estimate. Using default {DEFAULT_STAGE_COST_MB}MB.") self.stage_cost_estimates[stage] = DEFAULT_STAGE_COST_MB def _get_conservative_cost_estimate(self, stage: str) -> float: """ Estimates dynamic memory cost, using static estimate as a floor/max. Returns the maximum of the recent average dynamic cost per replica and the static estimate provided during initialization. This provides a conservative value for resource projection. Parameters ---------- stage : str The name of the stage. Returns ------- float The conservative memory cost estimate in MB per replica. """ static_cost = self.stage_cost_estimates.get(stage, DEFAULT_STAGE_COST_MB) memory_samples = self.memory_history.get(stage) # Use numpy.mean if samples exist, otherwise fallback to static if memory_samples and len(memory_samples) > 0: try: dynamic_avg = float(np.mean(memory_samples)) # Use max(dynamic, static) for projection, enforce min 1MB cost = max(dynamic_avg, static_cost, 1.0) return cost except Exception as e: logger.error( f"[PID-{stage}] Error calculating mean of memory samples: {e}. Falling back to static cost.", exc_info=False, ) return max(static_cost, 1.0) # Fallback safely return max(static_cost, 1.0) # Fallback to static estimate if no history # --- Public Method ---
[docs] def calculate_initial_proposals(self, stage_metrics: Dict[str, Dict[str, Any]]) -> Dict[str, StagePIDProposal]: """ Calculates initial, unconstrained replica proposals for each stage. Iterates through each stage, calculates its PID error and delta based on queue depth and target, and returns the initial proposals without considering global constraints. Includes dynamic cost estimates. Parameters ---------- stage_metrics : Dict[str, Dict[str, Any]] Dictionary mapping stage names to their current metrics. Expected keys per stage: 'replicas', 'queue_depth'. Optional: 'memory_usage', 'target_queue_depth', 'processing', 'min_replicas', 'max_replicas'. Returns ------- Dict[str, StagePIDProposal] Dictionary mapping stage names to their initial proposals, including current/proposed replicas, cost estimates, and original metrics. """ logger.debug("--- PID Controller: Calculating Initial Proposals ---") proposals: Dict[str, StagePIDProposal] = {} for stage, metrics in stage_metrics.items(): # Ensure state exists and initialize if necessary self._initialize_stage_state(stage) # --- Extract data and calculate current memory state --- replicas = metrics.get("replicas", 0) # Start with static cost as initial guess if no memory_usage provided initial_cost_guess = self.stage_cost_estimates.get(stage, DEFAULT_STAGE_COST_MB) memory_usage = metrics.get("memory_usage", initial_cost_guess * max(replicas, 1)) # Calculate memory per replica safely (avoid division by zero) current_memory_per_replica = memory_usage / max(replicas, 1.0) # Update memory history *before* calculating the conservative cost for *this* cycle's proposal self.memory_history[stage].append(current_memory_per_replica) # Recalculate conservative cost *after* updating history for the proposal conservative_cost = self._get_conservative_cost_estimate(stage) # --- PID Calculation --- queue_depth = metrics.get("queue_depth", 0) # Allow target override per stage, else use controller default target_queue_depth = metrics.get("target_queue_depth", self.target_queue_depth) min_replicas_metric = metrics.get("min_replicas", 0) max_replicas_metric = metrics.get("max_replicas", 1) # Default max should likely be higher # Idle penalty calculation if queue_depth == 0 and metrics.get("processing", 0) == 0: self.idle_cycles[stage] += 1 else: self.idle_cycles[stage] = 0 # Limit how much penalty can reduce the effective target below zero penalty = min(8, self.penalty_factor * (self.idle_cycles[stage] ** 2.0)) # Error calculation (Queue deviation from target, adjusted by idle penalty) error = (queue_depth - target_queue_depth) - penalty # Integral term update with basic anti-windup # Don't accumulate integral if already at boundary AND error pushes further past boundary should_accumulate_integral = True if replicas >= max_replicas_metric and error > 0: # At max replicas, still have backlog should_accumulate_integral = False logger.debug( f"[PID-{stage}] At max replicas ({replicas}) with positive error ({error:.2f}), pausing integral." ) elif ( replicas <= min_replicas_metric and error < 0 ): # At min replicas, queue is below target (or penalty active) should_accumulate_integral = False logger.debug( f"[PID-{stage}] At min replicas ({replicas}) with negative error ({error:.2f}), pausing integral." ) if should_accumulate_integral: self.integral_error[stage] += error # Update previous error state for potential future derivative use self.prev_error[stage] = error # --- Delta Calculation --- proportional_term = self.kp * error integral_term = self.ki * self.integral_error[stage] # derivative_term = self.kd * derivative # Still disabled # Combine terms raw_delta = proportional_term + integral_term # + derivative_term # Boost scale-up signals (positive error means queue > target) if error > 0: boosted_delta = raw_delta * self.error_boost_factor logger.debug(f"[PID-{stage}] Boosting positive error delta: {raw_delta:.3f} -> {boosted_delta:.3f}") raw_delta = boosted_delta # Round to get integer replica change delta_replicas = int(round(raw_delta)) proposed_replicas = replicas + delta_replicas logger.debug( f"[PID-{stage}] R={replicas}, Q={queue_depth}, Tgt={target_queue_depth}," f" Idle={self.idle_cycles[stage]}, Pen={penalty:.2f} -> " f"Err={error:.2f}, P={proportional_term:.2f}, I={integral_term:.2f}" f" (Acc={self.integral_error[stage]:.2f}) -> " f"DeltaR={delta_replicas}, RawProp={proposed_replicas}" ) # --- Create Final Proposal Object for this Stage --- proposal = StagePIDProposal( name=stage, current_replicas=replicas, proposed_replicas=proposed_replicas, conservative_cost_estimate=conservative_cost, # Use updated cost metrics=metrics, # Pass along original metrics ) proposals[stage] = proposal logger.debug("--- PID Controller: Initial Proposals Calculated ---") return proposals
[docs] class ResourceConstraintManager: """ Applies global resource constraints and safety checks to initial proposals. Takes the initial replica proposals generated by the PIDController and adjusts them based on global limits (max replicas, available CPU cores based on affinity, memory budget with safety buffer), and ensures pipeline consistency (zero-replica safety). It allocates limited resources proportionally if multiple stages request scale-ups simultaneously. If current global memory usage exceeds the effective limit, it aggressively scales down stages starting with the highest replica counts. """ def __init__( self, max_replicas: int, memory_threshold: int, estimated_edge_cost_mb: int, memory_safety_buffer_fraction: float, ): """ Initializes the Resource Constraint Manager using CoreCountDetector. Parameters are the same as before. """ if not (0.0 <= memory_safety_buffer_fraction < 1.0): raise ValueError("memory_safety_buffer_fraction must be between 0.0 and 1.0") self.max_replicas = max_replicas self.memory_threshold_mb = memory_threshold self.estimated_edge_cost_mb = estimated_edge_cost_mb # Keep track, though unused self.memory_safety_buffer_fraction = memory_safety_buffer_fraction # Unused self.effective_memory_limit_mb = self.memory_threshold_mb core_detector = SystemResourceProbe() # Instantiate the detector self.available_cores: Optional[float] = core_detector.get_effective_cores() self.core_detection_details: Dict[str, Any] = core_detector.get_details() # Determine a practical replica limit based on cores (optional, but often useful) self.core_based_replica_limit: Optional[int] = None if self.available_cores is not None and self.available_cores > 0: self.core_based_replica_limit = math.floor(self.available_cores) else: self.core_based_replica_limit = None # Treat as unlimited if detection failed logger.info( f"[ConstraintMgr] Initialized. MaxReplicas={max_replicas}, " f"EffectiveCoreLimit={self.available_cores:.2f} " # Log the potentially fractional value f"(Method: {self.core_detection_details.get('detection_method')}), " f"CoreBasedReplicaLimit={self.core_based_replica_limit}, " # Log the derived integer limit f"MemThreshold={memory_threshold}MB, " f"EffectiveLimit={self.effective_memory_limit_mb:.1f}MB " ) logger.debug(f"[ConstraintMgr] Core detection details: {self.core_detection_details}") # --- Private Methods --- @staticmethod def _get_effective_min_replicas(stage_name: str, metrics: Dict[str, Any], pipeline_in_flight: int) -> int: """Helper to calculate the effective minimum replicas for a stage.""" min_replicas_metric = metrics.get("min_replicas", 0) # If the pipeline is active globally, enforce a minimum of 1 replica, # unless min_replicas dictates higher. if pipeline_in_flight > 0: return max(1, min_replicas_metric) else: # Pipeline is globally idle # Allow scaling down to zero ONLY if the pipeline is idle AND min_replicas allows it. return min_replicas_metric def _apply_aggressive_memory_scale_down( self, current_proposals: Dict[str, int], initial_proposals_meta: Dict[str, "StagePIDProposal"], # Assuming StagePIDProposal type hint current_global_memory_usage: int, pipeline_in_flight_global: int, ) -> Dict[str, int]: """ If current memory exceeds the effective limit, force scale-downs. Reduces replicas for all stages with > 1 replica by 25% (rounded down), ensuring they don't go below their effective minimum or 1 replica. This is done in a single pass. Returns: Dict[str, int]: Updated replica proposals after aggressive scale-down. """ if current_global_memory_usage <= self.effective_memory_limit_mb: return current_proposals memory_overrun = current_global_memory_usage - self.effective_memory_limit_mb logger.warning( f"[ConstraintMgr] Aggressive Scale-Down Triggered: " f"Current Mem ({current_global_memory_usage:.1f}MB) > Effective Limit" f" ({self.effective_memory_limit_mb:.1f}MB). " f"Need to reduce by {memory_overrun:.1f}MB." ) adjusted_proposals = current_proposals.copy() total_memory_reduced = 0.0 stages_affected_details = {} # To store details of changes # Iterate through all proposals to apply the 25% reduction if applicable for name, current_replicas in current_proposals.items(): proposal_meta = initial_proposals_meta.get(name) if not proposal_meta: logger.error(f"[ConstraintMgr] Missing metadata for stage {name} during aggressive scale-down.") continue # Determine the effective minimum for this stage (ensuring at least 1) effective_min = self._get_effective_min_replicas(name, proposal_meta.metrics, pipeline_in_flight_global) # Cost per replica (assuming proposal_meta.conservative_cost_estimate is for ONE replica) # If it's for all current_replicas, you'd divide by current_replicas here. cost_per_replica = float( proposal_meta.conservative_cost_estimate if proposal_meta.conservative_cost_estimate and proposal_meta.conservative_cost_estimate > 0 else 1e-6 ) if current_replicas > 1: # Only consider stages with more than 1 replica # Calculate 25% reduction reduction_amount = math.floor(current_replicas * 0.25) # Ensure reduction_amount is at least 1 if current_replicas > 1 and 25% is < 1 # (e.g., for 2 or 3 replicas, 25% is 0, but we want to reduce by 1 if possible) if reduction_amount == 0 and current_replicas > 1: reduction_amount = 1 if reduction_amount > 0: proposed_new_replicas = current_replicas - reduction_amount # Ensure new count doesn't go below the effective minimum (which is at least 1) final_new_replicas = max(effective_min, proposed_new_replicas) # Only apply if this actually results in a reduction if final_new_replicas < current_replicas: replicas_actually_reduced = current_replicas - final_new_replicas memory_saved_for_stage = replicas_actually_reduced * cost_per_replica logger.info( f"[ConstraintMgr-{name}] Aggressive Scale-Down: Reducing from " f"{current_replicas} -> {final_new_replicas} " f"(by {replicas_actually_reduced} replicas, target 25% of " f"{current_replicas} was {reduction_amount}). " f"Est. memory saved: {memory_saved_for_stage:.2f}MB." ) adjusted_proposals[name] = final_new_replicas total_memory_reduced += memory_saved_for_stage stages_affected_details[name] = { "from": current_replicas, "to": final_new_replicas, "saved_mem": memory_saved_for_stage, } else: logger.debug( f"[ConstraintMgr-{name}] Aggressive Scale-Down: No reduction applied. " f"Current: {current_replicas}, Target 25% reduction: {reduction_amount}, " f"Proposed: {proposed_new_replicas}, Effective Min: {effective_min}." ) else: logger.debug( f"[ConstraintMgr-{name}] Aggressive Scale-Down: Calculated 25% reduction is 0 for " f"{current_replicas} replicas. No change." ) else: logger.debug( f"[ConstraintMgr-{name}] Aggressive Scale-Down: Stage has {current_replicas} " f"replica(s), not eligible for 25% reduction." ) # After applying reductions, check the new memory overrun # This is a projection based on our cost estimates. projected_new_global_memory_usage = current_global_memory_usage - total_memory_reduced new_memory_overrun = projected_new_global_memory_usage - self.effective_memory_limit_mb if not stages_affected_details: logger.warning("[ConstraintMgr] Aggressive Scale-Down: No stages were eligible or changed replicas.") elif new_memory_overrun > 0: logger.warning( f"[ConstraintMgr] Aggressive Scale-Down: Completed. Reduced total {total_memory_reduced:.1f}MB. " f"Stages affected: {len(stages_affected_details)}. " f"Projected memory still over limit by {new_memory_overrun:.1f}MB." # f"Details: {stages_affected_details}" # Potentially too verbose for warning ) else: logger.info( f"[ConstraintMgr] Aggressive Scale-Down: Completed. Reduced total {total_memory_reduced:.1f}MB. " f"Stages affected: {len(stages_affected_details)}. " f"Projected memory now below limit (overrun {new_memory_overrun:.1f}MB)." # f"Details: {stages_affected_details}" # Potentially too verbose for info ) if stages_affected_details: logger.debug(f"[ConstraintMgr] Aggressive Scale-Down Details: {stages_affected_details}") return adjusted_proposals def _apply_global_constraints_proportional( self, proposals_after_aggressive_sd: Dict[str, int], # Values from PID or after AggressiveMemSD initial_proposals_meta: Dict[str, "StagePIDProposal"], # Contains original .current_replicas current_global_memory_usage_mb: int, current_effective_mins: Dict[str, int], # Effective minimum for each stage room_to_scale_up_to_global_caps: bool, ) -> Dict[str, int]: """ Applies global replica, core, and memory limits to scale-up intentions. (Docstring from previous correct version summarizing the logic is fine) """ final_proposals_this_step = {} if not room_to_scale_up_to_global_caps: logger.debug( "[ConstraintMgr-Proportional] Global scaling beyond effective minimums is RESTRICTED " "as SumOfEffectiveMins likely meets/exceeds a global Core/MaxReplica cap. " "Proposed increases from initial current values will be nullified." ) for name, prop_meta in initial_proposals_meta.items(): val_from_prior_phases = proposals_after_aggressive_sd.get(name, prop_meta.current_replicas) original_current_replicas = prop_meta.current_replicas if val_from_prior_phases > original_current_replicas: final_proposals_this_step[name] = original_current_replicas if val_from_prior_phases != original_current_replicas: logger.debug( f"[ConstraintMgr-{name}] Proportional: Scaling restricted. " f"Nullified proposed increase from {original_current_replicas} to {val_from_prior_phases}. " f"Setting to {original_current_replicas}." ) else: final_proposals_this_step[name] = val_from_prior_phases return final_proposals_this_step # --- ELSE: room_to_scale_up_to_global_caps is TRUE --- # We can proportionally scale *increases above each stage's effective minimum*, # up to the global caps. The baseline sum for headroom is sum_of_effective_mins. # Stores (stage_name, proposed_increase_above_eff_min, cost_per_replica) upscale_deltas_above_eff_min: List[Tuple[str, int, float]] = [] total_requested_increase_replicas_above_eff_mins = 0 total_projected_mem_increase_for_deltas_mb = 0.0 # Initialize final_proposals_this_step: each stage starts at its effective minimum, # but not less than what aggressive_sd might have proposed (e.g., if agg_sd proposed 0 and eff_min is 0). # And not more than what PID/agg_sd proposed if that was already below effective_min. # Essentially, the base is max(eff_min, value_from_agg_sd_if_value_is_for_scale_down_or_no_change). # More simply: start each stage at its effective_min. The "delta" is how much PID wants *above* that. sum_of_effective_mins_for_baseline = 0 for name, prop_meta in initial_proposals_meta.items(): eff_min_for_stage = current_effective_mins[name] final_proposals_this_step[name] = eff_min_for_stage # Initialize with effective min sum_of_effective_mins_for_baseline += eff_min_for_stage # What did PID (after aggressive_sd) propose for this stage? pid_proposed_val = proposals_after_aggressive_sd.get(name, prop_meta.current_replicas) if pid_proposed_val > eff_min_for_stage: # This stage wants to scale up beyond its effective minimum. increase_delta = pid_proposed_val - eff_min_for_stage cost = prop_meta.conservative_cost_estimate upscale_deltas_above_eff_min.append((name, increase_delta, cost)) total_requested_increase_replicas_above_eff_mins += increase_delta total_projected_mem_increase_for_deltas_mb += increase_delta * cost logger.debug( f"[ConstraintMgr-Proportional] Room to scale. BaselineSum " f"(SumOfEffMins)={sum_of_effective_mins_for_baseline}. " f"NumStagesRequestingUpscaleAboveEffMin={len(upscale_deltas_above_eff_min)}. " f"TotalReplicaIncreaseReqAboveEffMin={total_requested_increase_replicas_above_eff_mins}. " f"TotalMemIncreaseForTheseDeltas={total_projected_mem_increase_for_deltas_mb:.2f}MB." ) reduction_factor = 1.0 limiting_reasons = [] if total_requested_increase_replicas_above_eff_mins <= 0: logger.debug( "[ConstraintMgr-Proportional] No upscale request beyond effective minimums. " "Proposals remain at effective minimums (or prior phase values if lower and valid)." ) # final_proposals_this_step already contains effective minimums. # We need to ensure if PID proposed *lower* than effective_min (and eff_min was 0), that's respected. # This should be: max(pid_proposed_value, eff_min_for_stage) for each stage. for name_check in final_proposals_this_step.keys(): pid_val = proposals_after_aggressive_sd.get( name_check, initial_proposals_meta[name_check].current_replicas ) eff_min_val = current_effective_mins[name_check] final_proposals_this_step[name_check] = ( max(pid_val, eff_min_val) if eff_min_val > 0 else pid_val ) # if eff_min is 0, allow PID to go to 0 return final_proposals_this_step projected_total_replicas_with_deltas = ( sum_of_effective_mins_for_baseline + total_requested_increase_replicas_above_eff_mins ) # 1. Max Replicas Config if projected_total_replicas_with_deltas > self.max_replicas: # Headroom is how many *additional* replicas (beyond sum_of_eff_mins) we can add permissible_increase_headroom = max(0, self.max_replicas - sum_of_effective_mins_for_baseline) factor = permissible_increase_headroom / total_requested_increase_replicas_above_eff_mins reduction_factor = min(reduction_factor, factor) limiting_reasons.append( f"MaxReplicas (Limit={self.max_replicas}, HeadroomAboveEffMins={permissible_increase_headroom}, " f"Factor={factor:.3f})" ) # 2. Core Based Replica Limit if ( self.core_based_replica_limit is not None and projected_total_replicas_with_deltas > self.core_based_replica_limit ): permissible_increase_headroom = max(0, self.core_based_replica_limit - sum_of_effective_mins_for_baseline) factor = permissible_increase_headroom / total_requested_increase_replicas_above_eff_mins reduction_factor = min(reduction_factor, factor) limiting_reasons.append( f"CoreLimit (Limit={self.core_based_replica_limit}, " f"HeadroomAboveEffMins={permissible_increase_headroom}, Factor={factor:.3f})" ) # 3. Memory Limit # Memory check is based on current_global_memory_usage_mb + memory_for_the_increase_deltas projected_total_global_memory_mb = current_global_memory_usage_mb + total_projected_mem_increase_for_deltas_mb if projected_total_global_memory_mb > self.effective_memory_limit_mb: # How much memory can we actually add without breaching the effective limit? permissible_mem_increase_mb = max(0.0, self.effective_memory_limit_mb - current_global_memory_usage_mb) factor_mem = ( permissible_mem_increase_mb / total_projected_mem_increase_for_deltas_mb if total_projected_mem_increase_for_deltas_mb > 1e-9 else 0.0 ) reduction_factor = min(reduction_factor, factor_mem) limiting_reasons.append( f"MemoryLimit (Factor={factor_mem:.3f}, AvailableMemForIncrease={permissible_mem_increase_mb:.1f}MB)" ) # Apply reduction to the deltas if reduction_factor <= 0.001: # Epsilon for float logger.debug( f"[ConstraintMgr-Proportional] Scale-up beyond effective minimums fully constrained by global limits. " f"Reasons: {'; '.join(limiting_reasons) if limiting_reasons else 'None'}. " f"Final ReductionFactor={reduction_factor:.3f}." " Stages will remain at their effective minimums (or prior phase values if lower and eff_min is 0)." ) # final_proposals_this_step already contains effective minimums. # Need to ensure if PID wanted lower than eff_min (and eff_min was 0), that is respected. for name_final_check in final_proposals_this_step.keys(): pid_val_final = proposals_after_aggressive_sd.get( name_final_check, initial_proposals_meta[name_final_check].current_replicas ) eff_min_final = current_effective_mins[name_final_check] # If effective min is 0, allow PID's value (which could be 0). Otherwise, floor is effective min. final_proposals_this_step[name_final_check] = ( pid_val_final if eff_min_final == 0 else max(pid_val_final, eff_min_final) ) elif reduction_factor < 1.0: logger.debug( f"[ConstraintMgr-Proportional] Reducing requested scale-up (beyond effective_mins) by " f"factor {reduction_factor:.3f}. " f"Limiting Factors: {'; '.join(limiting_reasons)}." ) for name, increase_delta_above_eff_min, _ in upscale_deltas_above_eff_min: allowed_increase = math.floor(increase_delta_above_eff_min * reduction_factor) # Add this allowed increase to the stage's effective minimum final_value_for_stage = current_effective_mins[name] + allowed_increase final_proposals_this_step[name] = final_value_for_stage if allowed_increase != increase_delta_above_eff_min: logger.debug( f"[ConstraintMgr-{name}] Proportional Adj: EffMin={current_effective_mins[name]}, " f"ReqIncreaseAboveEffMin={increase_delta_above_eff_min}, AllowedIncrease={allowed_increase} " f"-> FinalVal={final_value_for_stage}" ) else: # reduction_factor is ~1.0, meaning full requested increase (above effective_mins) is allowed logger.debug( "[ConstraintMgr-Proportional] Full requested scale-up (beyond effective_mins) " "is permissible by global limits." ) for name, increase_delta_above_eff_min, _ in upscale_deltas_above_eff_min: # The full PID-intended value (which came in as proposals_after_aggressive_sd) is applied. # Since final_proposals_this_step was initialized with effective_mins, # and increase_delta_above_eff_min = pid_proposed_val - eff_min_for_stage, # then eff_min_for_stage + increase_delta_above_eff_min = pid_proposed_val. pid_intended_val = proposals_after_aggressive_sd.get( name, initial_proposals_meta[name].current_replicas ) final_proposals_this_step[name] = ( pid_intended_val # This effectively applies the PID's full wish for this stage ) return final_proposals_this_step def _enforce_replica_bounds( self, stage_name: str, tentative_replicas: int, metrics: Dict[str, Any], pipeline_in_flight: int ) -> int: """Enforces per-stage min/max replica bounds and zero-replica safety logic.""" max_replicas_metric = metrics.get("max_replicas", 1) lower_bound = self._get_effective_min_replicas(stage_name, metrics, pipeline_in_flight) bounded_replicas = max(lower_bound, tentative_replicas) final_replicas = min(bounded_replicas, max_replicas_metric) if final_replicas != tentative_replicas: min_replicas_metric = metrics.get("min_replicas", 0) logger.debug( f"[ConstraintMgr-{stage_name}] Bounds Applied: Tentative={tentative_replicas} ->" f" Final={final_replicas} " f"(MinConfig={min_replicas_metric}, MaxConfig={max_replicas_metric}, " f"EffectiveLowerBound={lower_bound}, PipeInFlight={pipeline_in_flight})" ) elif final_replicas == 0 and lower_bound == 0: logger.debug(f"[ConstraintMgr-{stage_name}] Allowing scale to 0: Pipeline Idle and MinReplicas=0.") return final_replicas @staticmethod def _apply_global_consistency( final_adjustments: Dict[str, int], initial_proposals: Dict[str, StagePIDProposal] ) -> Dict: """Ensures pipeline doesn't get stuck if one stage scales up from zero.""" scale_up_from_zero_triggered = any( (prop.current_replicas == 0 and final_adjustments.get(name, 0) > 0) for name, prop in initial_proposals.items() ) if scale_up_from_zero_triggered: logger.debug("[ConstraintMgr] Wake-up consistency: Ensuring no stages stuck at zero.") for name, prop in initial_proposals.items(): if prop.current_replicas == 0 and final_adjustments.get(name, 0) == 0: min_r = prop.metrics.get("min_replicas", 0) max_r = prop.metrics.get("max_replicas", 1) target = max(1, min_r) final_target = min(target, max_r) if final_target > 0: logger.debug( f"[ConstraintMgr-{name}] Forcing minimum {final_target} replica due to global wake-up." ) final_adjustments[name] = final_target return final_adjustments def _log_final_constraint_summary( self, final_adjustments: Dict[str, int], initial_proposals: Dict[str, "StagePIDProposal"], # Forward reference global_in_flight: int, current_global_memory_usage_mb: int, num_edges: int, sum_of_effective_mins: int, can_globally_scale_beyond_effective_mins: bool, ) -> None: """Logs a structured and readable summary of the final state and limit checks.""" final_stage_replicas_total = sum(final_adjustments.values()) projected_final_memory_mb = sum( final_adjustments.get(name, 0) * initial_proposals[name].conservative_cost_estimate for name in final_adjustments ) num_queue_actors = num_edges total_ray_components_for_info = final_stage_replicas_total + num_queue_actors logger.debug("[ConstraintMgr] --- Final Decision & Constraint Summary ---") # --- I. Overall Pipeline State --- logger.debug(f"[ConstraintMgr] Pipeline Activity: {global_in_flight} tasks in-flight.") logger.debug(f"[ConstraintMgr] Effective Min Replicas (Sum): {sum_of_effective_mins}") logger.debug( f"[ConstraintMgr] └─ Global Scaling Beyond Mins Permitted? {can_globally_scale_beyond_effective_mins}" ) # --- II. Final Component Counts --- logger.debug(f"[ConstraintMgr] Final Stage Replicas: {final_stage_replicas_total} (Target for caps)") logger.debug(f"[ConstraintMgr] Queue/Edge Actors : {num_queue_actors} (Informational)") logger.debug(f"[ConstraintMgr] Total Ray Components: {total_ray_components_for_info} (Informational)") # --- III. Resource Limits & Projected Usage (for Stages) --- # Configured Limits max_r_cfg_str = str(self.max_replicas) core_based_limit_str = ( str(self.core_based_replica_limit) if self.core_based_replica_limit is not None else "N/A" ) eff_mem_limit_str = f"{self.effective_memory_limit_mb:.1f}MB" logger.debug("[ConstraintMgr] Global Limits (Stages):") logger.debug(f"[ConstraintMgr] ├─ MaxTotalReplicas : {max_r_cfg_str}") logger.debug( f"[ConstraintMgr] ├─ CoreBasedRepLimit : {core_based_limit_str} " f"(System EffCores: {self.available_cores if self.available_cores is not None else 'N/A'})" ) logger.debug(f"[ConstraintMgr] └─ EffectiveMemLimit : {eff_mem_limit_str} ") # Usage vs Limits logger.debug("[ConstraintMgr] Projected Usage (Stages):") logger.debug(f"[ConstraintMgr] ├─ Replicas : {final_stage_replicas_total}") logger.debug( f"[ConstraintMgr] └─ Memory : {projected_final_memory_mb:.1f}MB " f"(Current: {current_global_memory_usage_mb:.1f}MB)" ) # --- IV. Limit Adherence Analysis (for Stages) --- unexpected_breaches_details = [] # 1. Max Stage Replicas status_max_r = "OK" if final_stage_replicas_total > self.max_replicas: if not (sum_of_effective_mins >= self.max_replicas and final_stage_replicas_total <= sum_of_effective_mins): status_max_r = f"BREACHED (Final={final_stage_replicas_total} > Limit={self.max_replicas})" unexpected_breaches_details.append(f"MaxReplicas: {status_max_r}") else: status_max_r = f"NOTE: Limit met/exceeded by SumOfMins ({sum_of_effective_mins})" # 2. Core-Based Stage Replica Limit status_core_r = "N/A" if self.core_based_replica_limit is not None: status_core_r = "OK" if final_stage_replicas_total > self.core_based_replica_limit: if not ( sum_of_effective_mins >= self.core_based_replica_limit and final_stage_replicas_total <= sum_of_effective_mins ): status_core_r = ( f"BREACHED (Final={final_stage_replicas_total} > Limit={self.core_based_replica_limit})" ) unexpected_breaches_details.append(f"CoreBasedLimit: {status_core_r}") else: status_core_r = f"NOTE: Limit met/exceeded by SumOfMins ({sum_of_effective_mins})" # 3. Memory Limit tolerance = 0.01 # MB status_mem = "OK" if projected_final_memory_mb > (self.effective_memory_limit_mb + tolerance): status_mem = ( f"BREACHED (Projected={projected_final_memory_mb:.1f}MB > Limit={self.effective_memory_limit_mb:.1f}MB)" ) unexpected_breaches_details.append(f"MemoryLimit: {status_mem}") logger.debug("[ConstraintMgr] Limit Adherence (Stages):") logger.debug(f"[ConstraintMgr] ├─ MaxTotalReplicas : {status_max_r}") logger.debug(f"[ConstraintMgr] ├─ CoreBasedRepLimit : {status_core_r}") logger.debug(f"[ConstraintMgr] └─ EffectiveMemLimit : {status_mem}") if unexpected_breaches_details: logger.debug(f"[ConstraintMgr] └─ UNEXPECTED BREACHES: {'; '.join(unexpected_breaches_details)}") else: logger.debug("[ConstraintMgr] └─ All hard caps (beyond tolerated minimums/wake-up) appear respected.") # --- V. Final Decisions Per Stage --- logger.debug("[ConstraintMgr] Final Decisions (Per Stage):") if not final_adjustments: logger.debug("[ConstraintMgr] └─ No stages to adjust.") else: # Determine max stage name length for alignment max_name_len = 0 if final_adjustments: # Check if not empty max_name_len = max(len(name) for name in final_adjustments.keys()) for stage_name, count in sorted(final_adjustments.items()): orig_prop = initial_proposals.get(stage_name) pid_proposed_str = f"(PID: {orig_prop.proposed_replicas if orig_prop else 'N/A'})" current_str = f"(Current: {orig_prop.current_replicas if orig_prop else 'N/A'})" min_replicas = self._get_effective_min_replicas(stage_name, orig_prop.metrics, global_in_flight) eff_min_str = f"(EffMin: {min_replicas if orig_prop else 'N/A'})" # Basic alignment, can be improved with more sophisticated padding logger.debug( f"[ConstraintMgr] └─ {stage_name:<{max_name_len}} : " f"{count:<3} {pid_proposed_str} {current_str} {eff_min_str}" ) logger.debug("[ConstraintMgr] --- Constraint Summary END ---") # --- Public Method ---
[docs] def apply_constraints( self, initial_proposals: Dict[str, "StagePIDProposal"], global_in_flight: int, # Renamed from global_in_flight current_global_memory_usage_mb: int, num_edges: int, ) -> Dict[str, int]: """ Applies all configured constraints to initial replica proposals. (Docstring from previous version is fine) """ logger.debug( f"[ConstraintMgr] --- Applying Constraints START --- " f"GlobalInFlight={global_in_flight}, " f"CurrentGlobalMemMB={current_global_memory_usage_mb}, " f"NumEdges={num_edges}." ) logger.debug("[ConstraintMgr] Initial Proposals:") for name, prop in initial_proposals.items(): logger.debug( f"[ConstraintMgr] Stage '{name}': Current={prop.current_replicas}, " f"PIDProposed={prop.proposed_replicas}, CostMB={prop.conservative_cost_estimate:.2f}, " f"MinCfg={prop.metrics.get('min_replicas', 'N/A')}, MaxCfg={prop.metrics.get('max_replicas', 'N/A')}" ) # --- Phase 1: Initialize adjustments from PID proposals --- intermediate_adjustments: Dict[str, int] = { name: prop.proposed_replicas for name, prop in initial_proposals.items() } logger.debug(f"[ConstraintMgr] Intermediate Adjustments (Phase 1 - From PID): {intermediate_adjustments}") # --- Phase 2: Aggressive Memory Scale-Down (Optional) --- try: intermediate_adjustments = self._apply_aggressive_memory_scale_down( intermediate_adjustments, initial_proposals, current_global_memory_usage_mb, global_in_flight ) logger.debug( "[ConstraintMgr] Intermediate Adjustments (Phase 2 - After Aggressive Mem Scale-Down): " f"{intermediate_adjustments}" ) except Exception as e_agg: logger.error(f"[ConstraintMgr] Error during aggressive memory scale-down: {e_agg}", exc_info=True) intermediate_adjustments = {name: prop.current_replicas for name, prop in initial_proposals.items()} # --- Calculate Current Effective Minimums and Their Sum --- current_effective_mins: Dict[str, int] = {} sum_of_effective_mins = 0 for name, prop in initial_proposals.items(): eff_min = self._get_effective_min_replicas(name, prop.metrics, global_in_flight) current_effective_mins[name] = eff_min sum_of_effective_mins += eff_min logger.debug( f"[ConstraintMgr] Calculated Effective Minimums: TotalSum={sum_of_effective_mins}. " # f"IndividualMins: {current_effective_mins}" # Can be verbose ) # --- Determine if Baseline (Sum of Mins) Breaches Global Caps --- # This logic determines if we are *allowed* to scale any stage *beyond its own effective minimum* # if doing so would contribute to breaching a global cap that's *already threatened by the sum of minimums*. can_globally_scale_beyond_effective_mins_due_to_cores = True if self.core_based_replica_limit is not None and sum_of_effective_mins >= self.core_based_replica_limit: can_globally_scale_beyond_effective_mins_due_to_cores = False can_globally_scale_beyond_effective_mins_due_to_max_r = True if sum_of_effective_mins >= self.max_replicas: can_globally_scale_beyond_effective_mins_due_to_max_r = False # Combined gatekeeper for proportional scaling logic # If either cores or max_replicas cap is hit by sum of mins, we can't scale up further. # (Memory is handled slightly differently in proportional scaler - it looks at available headroom for increase) can_globally_scale_up_stages = ( can_globally_scale_beyond_effective_mins_due_to_cores and can_globally_scale_beyond_effective_mins_due_to_max_r ) # --- Phase 3: Apply Global Constraints & Proportional Allocation --- try: tentative_adjustments_from_prop = self._apply_global_constraints_proportional( intermediate_adjustments, initial_proposals, current_global_memory_usage_mb, current_effective_mins, can_globally_scale_up_stages, # Use the combined flag ) logger.debug( f"[ConstraintMgr] Tentative Adjustments (Phase 3 - After Proportional Allocation): " f"{tentative_adjustments_from_prop}" ) except Exception as e_prop: logger.error(f"[ConstraintMgr] Error during global proportional allocation: {e_prop}", exc_info=True) tentative_adjustments_from_prop = {} for name, count in intermediate_adjustments.items(): # Fallback logic tentative_adjustments_from_prop[name] = max(count, current_effective_mins.get(name, 0)) # --- Phase 4: Enforce Per-Stage Min/Max Replica Bounds --- final_adjustments: Dict[str, int] = {} for stage_name, proposal_meta in initial_proposals.items(): replicas_after_proportional = tentative_adjustments_from_prop.get( stage_name, proposal_meta.current_replicas ) try: bounded_replicas = self._enforce_replica_bounds( stage_name, replicas_after_proportional, proposal_meta.metrics, global_in_flight ) final_adjustments[stage_name] = bounded_replicas except Exception as e_bounds: logger.error( f"[ConstraintMgr-{stage_name}] Error during per-stage bound enforcement: {e_bounds}", exc_info=True ) final_adjustments[stage_name] = max( proposal_meta.current_replicas, current_effective_mins.get(stage_name, 0) ) logger.debug(f"[ConstraintMgr] Final Adjustments (Phase 4 - After Per-Stage Bounds): {final_adjustments}") # --- Phase 5: Apply Global Consistency (e.g., Wake-up Safety) --- try: final_adjustments = self._apply_global_consistency(final_adjustments, initial_proposals) logger.debug(f"[ConstraintMgr] Final Adjustments (Phase 5 - After Global Consistency): {final_adjustments}") except Exception as e_gc: logger.error(f"[ConstraintMgr] Error during global consistency application: {e_gc}", exc_info=True) # --- Log Final Summary --- self._log_final_constraint_summary( final_adjustments, initial_proposals, global_in_flight, current_global_memory_usage_mb, num_edges, sum_of_effective_mins, # Pass this calculated value can_globally_scale_up_stages, # Pass this for context in logging ) logger.debug("[ConstraintMgr] --- Applying Constraints END ---") return final_adjustments