Source code for nv_ingest.pipeline.config.replica_resolver

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Runtime replica resolution for static scaling mode.

This module provides functionality to resolve replica counts for stages using
non-static strategies when dynamic scaling is disabled, ensuring total memory
consumption stays within the static_memory_threshold.
"""

import logging
import os
from typing import List
from copy import deepcopy

from nv_ingest.pipeline.pipeline_schema import (
    PipelineConfigSchema,
    StageConfig,
    ReplicaCalculationStrategy,
    ReplicaStrategyConfig,
)
from nv_ingest_api.util.system.hardware_info import SystemResourceProbe

logger = logging.getLogger(__name__)


[docs] def resolve_static_replicas(pipeline_config: PipelineConfigSchema) -> PipelineConfigSchema: """ Resolve static replica counts for all stages when dynamic scaling is disabled. This function calculates the static replica counts for stages using non-static strategies, ensuring the total memory consumption stays within the configured static_memory_threshold. If the total exceeds the threshold, all non-static stages are scaled down proportionally (minimum 1 replica each). Parameters ---------- pipeline_config : PipelineConfigSchema The pipeline configuration with potentially unresolved replica strategies. Returns ------- PipelineConfigSchema A new pipeline configuration with all static replica counts resolved. """ # Only resolve if dynamic scaling is disabled if not pipeline_config.pipeline.disable_dynamic_scaling: logger.debug("Dynamic scaling enabled, skipping static replica resolution") return pipeline_config logger.info("Resolving static replica counts for disabled dynamic scaling mode") # Create a deep copy to avoid modifying the original config resolved_config = deepcopy(pipeline_config) # Get system resource information system_probe = SystemResourceProbe() total_memory_mb = system_probe.total_memory_mb available_memory_mb = int(total_memory_mb * resolved_config.pipeline.static_memory_threshold) logger.info( f"System memory: {total_memory_mb}MB, available for static replicas: {available_memory_mb}MB " f"(threshold: {resolved_config.pipeline.static_memory_threshold:.1%})" ) # Find stages with non-static strategies and calculate their baseline replica counts non_static_stages = [] total_memory_demand_mb = 0 for stage in resolved_config.stages: if stage.replicas and stage.replicas.static_replicas: if isinstance(stage.replicas.static_replicas, ReplicaStrategyConfig): strategy_config = stage.replicas.static_replicas baseline_replicas = _calculate_baseline_static_replicas( stage, strategy_config, system_probe, resolved_config.pipeline.static_memory_threshold ) memory_per_replica_mb = strategy_config.memory_per_replica_mb or 0 stage_memory_demand = baseline_replicas * memory_per_replica_mb non_static_stages.append( { "stage": stage, "strategy_config": strategy_config, "baseline_replicas": baseline_replicas, "memory_per_replica_mb": memory_per_replica_mb, "baseline_memory_demand_mb": stage_memory_demand, } ) total_memory_demand_mb += stage_memory_demand logger.debug( f"Stage '{stage.name}': {baseline_replicas} replicas × " f"{memory_per_replica_mb}MB = {stage_memory_demand}MB" ) if not non_static_stages: logger.info("No stages with non-static strategies found") return resolved_config logger.info(f"Total baseline memory demand: {total_memory_demand_mb}MB from {len(non_static_stages)} stages") # Optional bypass of global memory-based scale down via environment variable bypass_env = os.getenv("NV_INGEST_BYPASS_STATIC_MEMORY_SCALE_DOWN", "").strip().lower() bypass_scale_down = bypass_env in ("1", "true", "yes", "on") # Check if we need to scale down (unless bypassed) if bypass_scale_down: logger.warning( "Bypassing static memory-based replica scale-down due to NV_INGEST_BYPASS_STATIC_MEMORY_SCALE_DOWN" ) scaling_factor = 1.0 elif total_memory_demand_mb <= available_memory_mb: logger.info("Memory demand within threshold, applying baseline replica counts") scaling_factor = 1.0 else: # Calculate scaling factor to fit within memory threshold scaling_factor = available_memory_mb / total_memory_demand_mb logger.warning( f"Memory demand exceeds threshold by {((total_memory_demand_mb / available_memory_mb) - 1) * 100:.1f}%, " f"scaling down by factor of {scaling_factor:.3f}" ) # Apply the resolved replica counts total_actual_memory_mb = 0 for stage_info in non_static_stages: stage = stage_info["stage"] baseline_replicas = stage_info["baseline_replicas"] memory_per_replica_mb = stage_info["memory_per_replica_mb"] # Calculate scaled replica count (minimum 1) scaled_replicas = max(1, int(baseline_replicas * scaling_factor)) actual_memory_mb = scaled_replicas * memory_per_replica_mb total_actual_memory_mb += actual_memory_mb # Replace the strategy config with a static replica count stage.replicas.static_replicas = scaled_replicas logger.info( f"Stage '{stage.name}': {baseline_replicas}{scaled_replicas} replicas " f"({actual_memory_mb}MB)" ) logger.info( f"Total actual memory allocation: {total_actual_memory_mb}MB " f"({(total_actual_memory_mb / total_memory_mb) * 100:.1f}% of system memory)" ) return resolved_config
def _calculate_baseline_static_replicas( stage: StageConfig, strategy_config: ReplicaStrategyConfig, system_probe: SystemResourceProbe, static_memory_threshold: float = 0.75, ) -> int: """ Calculate the baseline static replica count for a stage based on its strategy. Parameters ---------- stage : StageConfig The stage configuration. strategy_config : ReplicaStrategyConfig The replica strategy configuration. system_probe : SystemResourceProbe System resource information. static_memory_threshold : float, optional The global static memory threshold (default: 0.75). Returns ------- int The calculated baseline replica count. """ strategy = strategy_config.strategy if strategy == ReplicaCalculationStrategy.STATIC: return strategy_config.value or 1 elif strategy == ReplicaCalculationStrategy.CPU_PERCENTAGE: cpu_percent = strategy_config.cpu_percent or 0.5 limit = strategy_config.limit or system_probe.cpu_count calculated = max(1, int(system_probe.cpu_count * cpu_percent)) return min(calculated, limit) elif strategy == ReplicaCalculationStrategy.MEMORY_THRESHOLDING: # For memory thresholding, use a conservative approach for static mode memory_per_replica_mb = strategy_config.memory_per_replica_mb or 1000 available_memory_mb = int(system_probe.total_memory_mb * 0.7) # Conservative 70% calculated = max(1, available_memory_mb // memory_per_replica_mb) limit = strategy_config.limit or calculated return min(calculated, limit) elif strategy == ReplicaCalculationStrategy.MEMORY_STATIC_GLOBAL_PERCENT: # Use the global static memory threshold for calculation memory_per_replica_mb = strategy_config.memory_per_replica_mb or 1000 available_memory_mb = int(system_probe.total_memory_mb * static_memory_threshold) calculated = max(1, available_memory_mb // memory_per_replica_mb) limit = strategy_config.limit or calculated return min(calculated, limit) else: logger.warning(f"Unknown replica strategy '{strategy}' for stage '{stage.name}', defaulting to 1 replica") return 1
[docs] def get_memory_intensive_stages(pipeline_config: PipelineConfigSchema) -> List[str]: """ Identify stages that are memory-intensive and may need special handling. Parameters ---------- pipeline_config : PipelineConfigSchema The pipeline configuration. Returns ------- List[str] List of stage names that are memory-intensive. """ memory_intensive_stages = [] for stage in pipeline_config.stages: if stage.replicas and stage.replicas.static_replicas: if isinstance(stage.replicas.static_replicas, ReplicaStrategyConfig): strategy_config = stage.replicas.static_replicas memory_per_replica_mb = strategy_config.memory_per_replica_mb or 0 # Consider stages using >5GB per replica as memory-intensive if memory_per_replica_mb > 5000: memory_intensive_stages.append(stage.name) return memory_intensive_stages