Source code for nv_ingest.pipeline.pipeline_schema

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from enum import Enum
from typing import Dict, Any, List, Optional, Set, Union
from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict

from nv_ingest_api.internal.enums.common import PipelinePhase


[docs] class StageType(str, Enum): """ The type of a pipeline stage. """ SOURCE = "source" STAGE = "stage" SINK = "sink"
[docs] class ReplicaCalculationStrategy(str, Enum): """ Strategy for calculating replica counts at runtime. """ STATIC = "static" # Fixed number of replicas CPU_PERCENTAGE = "cpu_percentage" # Percentage of available CPU cores MEMORY_THRESHOLDING = "memory_thresholding" # Based on memory allocation per replica MEMORY_STATIC_GLOBAL_PERCENT = "memory_static_global_percent" # Memory-safe calculation with budget limits
[docs] class ReplicaStrategyConfig(BaseModel): """ Configuration for a specific replica calculation strategy. Attributes ---------- strategy : ReplicaCalculationStrategy The calculation strategy to use. value : Optional[Union[int, float]] The primary value for the strategy (e.g., static count, CPU percentage). limit : Optional[int] Optional upper limit for calculated replicas. cpu_percent : Optional[float] CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0). memory_per_replica_mb : Optional[int] Expected memory usage per replica in MB. memory_threshold_percent : Optional[float] Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0). max_memory_budget_mb : Optional[int] Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB. """ strategy: ReplicaCalculationStrategy = Field(..., description="The calculation strategy to use.") value: Optional[Union[int, float]] = Field(None, description="Primary value for the strategy.") limit: Optional[int] = Field(None, description="Optional upper limit for calculated replicas.", ge=1) cpu_percent: Optional[float] = Field( None, description="CPU percentage for CPU_PERCENTAGE strategy.", ge=0.0, le=1.0 ) memory_per_replica_mb: Optional[int] = Field(None, description="Expected memory usage per replica in MB.", gt=0) memory_threshold_percent: Optional[float] = Field( None, description="Memory threshold percentage for MEMORY_THRESHOLDING strategy.", ge=0.0, le=1.0 ) max_memory_budget_mb: Optional[int] = Field( None, description="Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", gt=0 )
[docs] @model_validator(mode="after") def validate_strategy_config(self): """Validate that required fields are present for each strategy.""" if self.strategy == ReplicaCalculationStrategy.STATIC: if self.value is None or not isinstance(self.value, int): raise ValueError("STATIC strategy requires 'value' as an integer") elif self.strategy == ReplicaCalculationStrategy.CPU_PERCENTAGE: if self.cpu_percent is None: if self.value is None or not isinstance(self.value, (int, float)): raise ValueError("CPU_PERCENTAGE strategy requires 'cpu_percent' or 'value' as a float") self.cpu_percent = float(self.value) elif self.strategy == ReplicaCalculationStrategy.MEMORY_THRESHOLDING: if self.memory_per_replica_mb is None: raise ValueError("MEMORY_THRESHOLDING strategy requires 'memory_per_replica_mb'") elif self.strategy == ReplicaCalculationStrategy.MEMORY_STATIC_GLOBAL_PERCENT: if self.memory_per_replica_mb is None: raise ValueError("MEMORY_STATIC_GLOBAL_PERCENT strategy requires 'memory_per_replica_mb'") # max_memory_budget_mb is optional - uses global static_memory_threshold if not provided return self
[docs] class ReplicaConfig(BaseModel): """ Configuration for stage replicas supporting both dynamic and static scaling modes. Defines the min/max number of replicas for a stage, either as absolute counts, percentages of total CPU cores, or resource-based calculations. Supports different configurations for dynamic vs static scaling modes. Attributes ---------- cpu_count_min : Optional[int] Absolute minimum number of replicas. Must be >= 0. (Legacy support) cpu_count_max : Optional[int] Absolute maximum number of replicas. Must be >= 1. (Legacy support) cpu_percent_min : Optional[float] Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support) cpu_percent_max : Optional[float] Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support) min_replicas : Optional[int] Minimum number of replicas for both scaling modes. Must be >= 0. max_replicas : Optional[Union[int, ReplicaStrategyConfig]] Maximum replicas for dynamic scaling mode. Can be static int or strategy config. static_replicas : Optional[Union[int, ReplicaStrategyConfig]] Replica configuration for static scaling mode. Can be static int or strategy config. """ # Legacy fields for backward compatibility cpu_count_min: Optional[int] = Field(None, description="Absolute minimum number of replicas.", ge=0) cpu_count_max: Optional[int] = Field(None, description="Absolute maximum number of replicas.", ge=1) cpu_percent_min: Optional[float] = Field( None, description="Minimum number of replicas as a percentage of total cores.", ge=0.0, le=1.0 ) cpu_percent_max: Optional[float] = Field( None, description="Maximum number of replicas as a percentage of total cores.", ge=0.0, le=1.0 ) # New flexible replica configuration min_replicas: Optional[int] = Field(None, description="Minimum number of replicas.", ge=0) max_replicas: Optional[Union[int, ReplicaStrategyConfig]] = Field( None, description="Maximum replicas for dynamic scaling mode." ) static_replicas: Optional[Union[int, ReplicaStrategyConfig]] = Field( None, description="Replica configuration for static scaling mode." )
[docs] @model_validator(mode="after") def check_exclusive_min_max(self) -> "ReplicaConfig": """ Validates that replica configuration is consistent and complete. Ensures that: 1. Legacy fields (cpu_count_*, cpu_percent_*) are not mixed with new fields 2. At least one configuration method is specified 3. Min/max relationships are valid """ legacy_fields = [self.cpu_count_min, self.cpu_count_max, self.cpu_percent_min, self.cpu_percent_max] new_fields = [self.min_replicas, self.max_replicas, self.static_replicas] has_legacy = any(field is not None for field in legacy_fields) has_new = any(field is not None for field in new_fields) if has_legacy and has_new: raise ValueError( "Cannot mix legacy replica fields (cpu_count_*, cpu_percent_*) with new fields " "(min_replicas, max_replicas, static_replicas). Use one approach or the other." ) if not has_legacy and not has_new: # Set sensible defaults for new configuration self.min_replicas = 0 self.max_replicas = 1 # Legacy validation (existing logic) if has_legacy: if self.cpu_count_min is not None and self.cpu_percent_min is not None: raise ValueError("Cannot specify both cpu_count_min and cpu_percent_min") if self.cpu_count_max is not None and self.cpu_percent_max is not None: raise ValueError("Cannot specify both cpu_count_max and cpu_percent_max") # Validate min <= max for legacy fields if self.cpu_count_min is not None and self.cpu_count_max is not None: if self.cpu_count_min > self.cpu_count_max: raise ValueError("cpu_count_min cannot be greater than cpu_count_max") if self.cpu_percent_min is not None and self.cpu_percent_max is not None: if self.cpu_percent_min > self.cpu_percent_max: raise ValueError("cpu_percent_min cannot be greater than cpu_percent_max") # New configuration validation if has_new: # Validate min_replicas against max_replicas if both are static integers if ( self.min_replicas is not None and isinstance(self.max_replicas, int) and self.min_replicas > self.max_replicas ): raise ValueError("min_replicas cannot be greater than max_replicas") # Validate min_replicas against static_replicas if both are static integers if ( self.min_replicas is not None and isinstance(self.static_replicas, int) and self.min_replicas > self.static_replicas ): raise ValueError("min_replicas cannot be greater than static_replicas") return self
model_config = ConfigDict(extra="forbid")
[docs] class StageConfig(BaseModel): """ Configuration for a single pipeline stage. Describes a single component in the ingestion pipeline, including its name, type, actor implementation, and specific configuration. Attributes ---------- name : str A unique name to identify the stage within the pipeline. type : StageType The type of the stage, which determines how it's added to the RayPipeline. phase: PipelinePhase The logical phase of the stage in the pipeline. actor : Optional[str] The fully qualified import path to the actor class or function that implements the stage's logic. Mutually exclusive with 'callable'. callable : Optional[str] The fully qualified import path to a callable function that implements the stage's logic. Mutually exclusive with 'actor'. task_filters: Optional[List[Any]] List of task types this callable stage should filter for. Only applies to callable stages. Supports both simple strings (e.g., "udf") and complex filters (e.g., ["udf", {"phase": 5}]). enabled : bool A flag to indicate whether the stage should be included in the pipeline. If False, the stage and its connected edges are ignored. config : Dict[str, Any] A dictionary of configuration parameters passed to the stage's actor. replicas : ReplicaConfig The replica configuration for the stage. runs_after: List[str] A list of stage names that this stage must be downstream of. """ name: str = Field(..., description="Unique name for the stage.") type: StageType = Field(StageType.STAGE, description="Type of the stage.") phase: PipelinePhase = Field(..., description="The logical phase of the stage.") actor: Optional[str] = Field(None, description="Full import path to the stage's actor class or function.") callable: Optional[str] = Field(None, description="Full import path to a callable function for the stage.") task_filters: Optional[List[Any]] = Field( None, description="List of task types this callable stage should filter for. Only applies to callable stages." ) enabled: bool = Field(True, description="Whether the stage is enabled.") config: Dict[str, Any] = Field({}, description="Configuration dictionary for the stage.") replicas: ReplicaConfig = Field(default_factory=ReplicaConfig, description="Replica configuration.") runs_after: List[str] = Field(default_factory=list, description="List of stages this stage must run after.")
[docs] @model_validator(mode="after") def check_actor_or_callable(self) -> "StageConfig": """ Validates that exactly one of 'actor' or 'callable' is specified. """ if self.actor is None and self.callable is None: raise ValueError("Either 'actor' or 'callable' must be specified") if self.actor is not None and self.callable is not None: raise ValueError("Cannot specify both 'actor' and 'callable' - they are mutually exclusive") return self
model_config = ConfigDict(extra="forbid")
[docs] class EdgeConfig(BaseModel): """ Configuration for an edge between two stages. Defines a connection from a source stage to a destination stage, including the size of the intermediate queue. Attributes ---------- from_stage : str The name of the source stage for the edge. to_stage : str The name of the destination stage for the edge. queue_size : int The maximum number of items in the queue between the two stages. """ from_stage: str = Field(..., alias="from", description="The name of the source stage.") to_stage: str = Field(..., alias="to", description="The name of the destination stage.") queue_size: int = Field(100, gt=0, description="The size of the queue between stages.") model_config = ConfigDict(extra="forbid")
[docs] class PIDControllerConfig(BaseModel): """ Configuration for the PID controller used in dynamic scaling. Attributes ---------- kp : float Proportional gain for the PID controller. ki : float Integral gain for the PID controller. ema_alpha : float Exponential moving average alpha for the PID controller. target_queue_depth : int Target queue depth for the PID controller. penalty_factor : float Penalty factor for the PID controller. error_boost_factor : float Error boost factor for the PID controller. rcm_memory_safety_buffer_fraction : float Resource constraint manager memory safety buffer fraction. """ kp: float = Field(0.2, gt=0.0, description="Proportional gain for the PID controller.") ki: float = Field(0.01, ge=0.0, description="Integral gain for the PID controller.") ema_alpha: float = Field( 0.1, ge=0.0, le=1.0, description="Exponential moving average alpha for the PID controller." ) target_queue_depth: int = Field(0, ge=0, description="Target queue depth for the PID controller.") penalty_factor: float = Field(0.1, ge=0.0, description="Penalty factor for the PID controller.") error_boost_factor: float = Field(1.5, gt=0.0, description="Error boost factor for the PID controller.") rcm_memory_safety_buffer_fraction: float = Field( 0.15, ge=0.0, le=1.0, description="Resource constraint manager memory safety buffer fraction." ) model_config = ConfigDict(extra="forbid")
[docs] class PipelineRuntimeConfig(BaseModel): """ Configuration for pipeline runtime behavior. Parameters ---------- disable_dynamic_scaling : bool Whether to disable dynamic scaling of replicas (default: False). dynamic_memory_threshold : float The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions. static_memory_threshold : float Global memory threshold for static scaling mode (default: 0.75). pid_controller : PIDControllerConfig PID controller configuration for dynamic scaling. launch_simple_broker : bool If True, launches a simple message broker for the pipeline. """ disable_dynamic_scaling: bool = Field(False, description="Disable dynamic scaling of stage replicas.") dynamic_memory_threshold: float = Field( 0.75, ge=0.0, le=0.95, description="Memory utilization threshold for dynamic scaling." ) static_memory_threshold: float = Field( 0.75, ge=0.0, le=1.0, description="Global memory threshold for static scaling mode." ) pid_controller: PIDControllerConfig = Field( default_factory=PIDControllerConfig, description="PID controller configuration for dynamic scaling." ) launch_simple_broker: bool = Field(False, description="Launch a simple message broker for the pipeline.") model_config = ConfigDict(extra="forbid")
[docs] class PipelineConfigSchema(BaseModel): """ Root configuration model for an ingestion pipeline. This model represents the entire declarative configuration for an ingestion pipeline, including all stages and the edges that connect them. Attributes ---------- name : str The name of the pipeline. description : str A description of the pipeline. stages : List[StageConfig] A list of all stage configurations in the pipeline. edges : List[EdgeConfig] A list of all edge configurations that define the pipeline's topology. pipeline: Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig, description="Runtime configuration for the pipeline.") """ name: str = Field(..., description="The name of the pipeline.") description: str = Field(..., description="A description of the pipeline.") stages: List[StageConfig] = Field(..., description="List of all stages in the pipeline.") edges: List[EdgeConfig] = Field(..., description="List of all edges connecting the stages.") pipeline: Optional[PipelineRuntimeConfig] = Field( default_factory=PipelineRuntimeConfig, description="Runtime configuration for the pipeline." )
[docs] @field_validator("stages", "edges") def check_not_empty(cls, v: list) -> list: """Validates that the list is not empty.""" if not v: raise ValueError("must not be empty") return v
[docs] def get_phases(self) -> Set[PipelinePhase]: """Returns a set of all unique phases in the pipeline.""" return {stage.phase for stage in self.stages}
model_config = ConfigDict(extra="forbid")