Source code for nv_ingest.pipeline.ingest_pipeline

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import logging
import math
from typing import Dict, Optional, Type, List, Set
import os

from nv_ingest.framework.orchestration.ray.primitives.ray_pipeline import RayPipeline, ScalingConfig
from nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base import RayActorSinkStage
from nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_source_stage_base import RayActorSourceStage
from nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base import RayActorStage
from nv_ingest.framework.orchestration.ray.util.pipeline.tools import wrap_callable_as_stage
from nv_ingest.pipeline.pipeline_schema import (
    PipelineConfigSchema,
    StageConfig,
    StageType,
    ReplicaStrategyConfig,
)
from nv_ingest_api.util.imports.callable_signatures import ingest_stage_callable_signature
from nv_ingest_api.util.imports.dynamic_resolvers import resolve_actor_class_from_path, resolve_callable_from_path
from nv_ingest_api.util.introspection.class_inspect import (
    find_pydantic_config_schema,
    find_pydantic_config_schema_unified,
)
from nv_ingest_api.util.system.hardware_info import SystemResourceProbe

logger = logging.getLogger(__name__)


[docs] class IngestPipelineBuilder: """ A high-level builder for creating and configuring an ingestion pipeline. This class translates a `PipelineConfig` object into a runnable `RayPipeline`, handling class resolution, configuration validation, replica calculation, and stage/edge construction. Attributes ---------- _config : PipelineConfigSchema The declarative configuration for the pipeline. _pipeline : RayPipeline The underlying RayPipeline instance being constructed. _system_resource_probe : SystemResourceProbe A utility to probe for available system resources like CPU cores. """ def __init__(self, config: PipelineConfigSchema, system_resource_probe: Optional[SystemResourceProbe] = None): """ Initializes the IngestPipeline. Parameters ---------- config : PipelineConfigSchema The pipeline configuration object. system_resource_probe : Optional[SystemResourceProbe], optional A probe for system resources. If not provided, a default instance will be created. Defaults to None. """ logger.debug(f"Initializing IngestPipeline for '{config.name}'.") self._config: PipelineConfigSchema = config scaling_config = ScalingConfig( dynamic_memory_scaling=not config.pipeline.disable_dynamic_scaling, dynamic_memory_threshold=config.pipeline.dynamic_memory_threshold, pid_kp=config.pipeline.pid_controller.kp, pid_ki=config.pipeline.pid_controller.ki, pid_ema_alpha=config.pipeline.pid_controller.ema_alpha, pid_target_queue_depth=config.pipeline.pid_controller.target_queue_depth, pid_penalty_factor=config.pipeline.pid_controller.penalty_factor, pid_error_boost_factor=config.pipeline.pid_controller.error_boost_factor, rcm_memory_safety_buffer_fraction=config.pipeline.pid_controller.rcm_memory_safety_buffer_fraction, ) self._pipeline: RayPipeline = RayPipeline(scaling_config=scaling_config) self._system_resource_probe: SystemResourceProbe = system_resource_probe or SystemResourceProbe() self._is_built: bool = False self._built_stages: Set[str] = set()
[docs] def build(self) -> None: """ Builds the ingestion pipeline from the configuration. This method constructs the RayPipeline by adding stages and edges as defined in the pipeline configuration. It also validates dependencies and ensures the pipeline is ready to be started. Raises ------ ValueError If the pipeline configuration is invalid, such as containing circular dependencies or references to non-existent stages. """ if self._is_built: logger.warning("Pipeline is already built. Skipping build.") return logger.info(f"Building pipeline '{self._config.name}'...") # First, validate the overall structure and dependencies self._validate_dependencies() # Then, build the stages total_cpus = os.cpu_count() or 1 for stage_config in self._config.stages: if not stage_config.enabled: logger.info(f"Stage '{stage_config.name}' is disabled. Skipping.") continue self._build_stage(stage_config, total_cpus) # Finally, add the edges for edge_config in self._config.edges: if not (edge_config.from_stage in self._built_stages and edge_config.to_stage in self._built_stages): logger.warning( f"Skipping edge from '{edge_config.from_stage}' to '{edge_config.to_stage}' " f"because one or both stages are disabled or failed to build." ) continue self._pipeline.make_edge( from_stage=edge_config.from_stage, to_stage=edge_config.to_stage, queue_size=edge_config.queue_size, ) self._pipeline.build() self._is_built = True logger.info(f"Pipeline '{self._config.name}' built successfully.")
def _build_stage(self, stage_config: StageConfig, total_cpus: int) -> None: """Builds and adds a single stage to the pipeline.""" logger.debug(f"Building stage '{stage_config.name}'...") stage_type_enum = StageType(stage_config.type) expected_base_class: Optional[Type] = { StageType.SOURCE: RayActorSourceStage, StageType.SINK: RayActorSinkStage, StageType.STAGE: RayActorStage, }.get(stage_type_enum) if not expected_base_class: raise ValueError(f"Invalid stage type '{stage_config.type}' for stage '{stage_config.name}'") # Handle callable vs actor stage configurations if stage_config.callable: # Handle callable stage callable_fn = resolve_callable_from_path( stage_config.callable, signature_schema=ingest_stage_callable_signature ) config_schema = find_pydantic_config_schema_unified(callable_fn, param_name="stage_config") # For callable stages, we need a schema to wrap the callable if not config_schema: raise ValueError( f"Callable stage '{stage_config.name}' must have a Pydantic schema in its stage_config parameter" ) # Wrap callable as a stage using wrap_callable_as_stage actor_class = wrap_callable_as_stage(callable_fn, config_schema, required_tasks=stage_config.task_filters) # For callable stages, the config instance is handled by wrap_callable_as_stage config_instance = config_schema(**stage_config.config) if config_schema else None else: # Handle actor stage (existing logic) actor_class = resolve_actor_class_from_path(stage_config.actor, expected_base_class) config_schema = find_pydantic_config_schema(actor_class, expected_base_class) config_instance = config_schema(**stage_config.config) if config_schema else None add_method = getattr(self._pipeline, f"add_{stage_config.type.value}", None) if not add_method: raise AttributeError(f"Pipeline has no method 'add_{stage_config.type.value}'") replicas = stage_config.replicas min_replicas, max_replicas = 1, 1 # Check if dynamic scaling is disabled by checking pipeline config dynamic_scaling_disabled = getattr(self._config.pipeline, "disable_dynamic_scaling", False) if replicas and total_cpus: # Handle new replica configuration format if hasattr(replicas, "min_replicas") and replicas.min_replicas is not None: min_replicas = replicas.min_replicas elif replicas.cpu_count_min is not None: # Legacy support min_replicas = replicas.cpu_count_min elif replicas.cpu_percent_min is not None: # Legacy support min_replicas = math.floor(replicas.cpu_percent_min * total_cpus) # For max_replicas, prioritize based on scaling mode if dynamic_scaling_disabled: # Static scaling mode - use static_replicas if available if hasattr(replicas, "static_replicas") and replicas.static_replicas is not None: if isinstance(replicas.static_replicas, int): # Use resolved static replica count max_replicas = replicas.static_replicas min_replicas = replicas.static_replicas # In static mode, min == max logger.debug(f"Stage '{stage_config.name}': Using resolved static replicas = {max_replicas}") else: # Should not happen after resolve_static_replicas, but fallback to legacy logger.warning( f"Stage '{stage_config.name}': " "static_replicas not resolved to int, using legacy calculation" ) max_replicas = self._calculate_legacy_max_replicas(replicas, total_cpus) else: # No static_replicas defined, use legacy calculation max_replicas = self._calculate_legacy_max_replicas(replicas, total_cpus) else: # Dynamic scaling mode - use max_replicas if hasattr(replicas, "max_replicas") and replicas.max_replicas is not None: if isinstance(replicas.max_replicas, int): max_replicas = replicas.max_replicas else: # ReplicaStrategyConfig - calculate based on strategy and system resources max_replicas = self._calculate_strategy_based_replicas( stage_config.name, replicas.max_replicas, total_cpus ) logger.debug( f"Stage '{stage_config.name}': max_replicas calculated from strategy = {max_replicas}" ) else: # Legacy calculation max_replicas = self._calculate_legacy_max_replicas(replicas, total_cpus) # Ensure max_replicas is not less than min_replicas max_replicas = max(min_replicas, max_replicas) actor_kwarg = f"{stage_config.type.value}_actor" add_method( name=stage_config.name, **{actor_kwarg: actor_class}, config=config_instance, min_replicas=min_replicas, max_replicas=max_replicas, ) logger.debug(f"Added stage '{stage_config.name}' ({min_replicas}-{max_replicas} replicas) to the pipeline.") self._built_stages.add(stage_config.name) def _calculate_legacy_max_replicas(self, replicas, total_cpus): if replicas.cpu_count_max is not None: return replicas.cpu_count_max elif replicas.cpu_percent_max is not None: return math.ceil(replicas.cpu_percent_max * total_cpus) else: return 1 def _calculate_strategy_based_replicas( self, stage_name: str, strategy_config: ReplicaStrategyConfig, total_cpus: int ) -> int: """ Calculate replica count based on ReplicaStrategyConfig for dynamic scaling. Parameters ---------- stage_name : str Name of the stage for logging purposes. strategy_config : ReplicaStrategyConfig The replica strategy configuration. total_cpus : int Total available CPU cores. Returns ------- int Calculated replica count. """ from nv_ingest.pipeline.pipeline_schema import ReplicaCalculationStrategy strategy = strategy_config.strategy if strategy == ReplicaCalculationStrategy.STATIC: return strategy_config.value or 1 elif strategy == ReplicaCalculationStrategy.CPU_PERCENTAGE: cpu_percent = strategy_config.cpu_percent or 0.5 limit = strategy_config.limit or total_cpus calculated = max(1, int(total_cpus * cpu_percent)) result = min(calculated, limit) logger.debug( f"Stage '{stage_name}': CPU_PERCENTAGE strategy: {cpu_percent:.1%} of {total_cpus} " f"CPUs = {calculated}, limited to {result}" ) return result elif strategy == ReplicaCalculationStrategy.MEMORY_THRESHOLDING: # For dynamic scaling, use a more aggressive memory allocation (80% vs 70% for static) memory_per_replica_mb = strategy_config.memory_per_replica_mb or 1000 available_memory_mb = int(self._system_resource_probe.total_memory_mb * 0.8) calculated = max(1, available_memory_mb // memory_per_replica_mb) limit = strategy_config.limit or calculated result = min(calculated, limit) logger.debug( f"Stage '{stage_name}': MEMORY_THRESHOLDING strategy: {available_memory_mb}" f"MB / {memory_per_replica_mb}MB = {calculated}, limited to {result}" ) return result elif strategy == ReplicaCalculationStrategy.MEMORY_STATIC_GLOBAL_PERCENT: # For dynamic scaling, this strategy behaves like memory_thresholding but with global threshold memory_per_replica_mb = strategy_config.memory_per_replica_mb or 1000 # Use dynamic memory threshold from pipeline config, fallback to 0.8 dynamic_threshold = getattr(self._config.pipeline, "dynamic_memory_threshold", 0.8) available_memory_mb = int(self._system_resource_probe.total_memory_mb * dynamic_threshold) calculated = max(1, available_memory_mb // memory_per_replica_mb) limit = strategy_config.limit or calculated result = min(calculated, limit) logger.debug( f"Stage '{stage_name}': MEMORY_STATIC_GLOBAL_PERCENT strategy (dynamic): " f"{available_memory_mb}MB / {memory_per_replica_mb}MB = {calculated}, limited to {result}" ) return result else: logger.warning(f"Unknown replica strategy '{strategy}' for stage '{stage_name}', defaulting to 1 replica") return 1 def _validate_dependencies(self) -> None: """ Validates stage dependencies, checking for undefined stages and circular dependencies. Raises ------ ValueError If a stage has an invalid dependency (points to a non-existent stage) or if a circular dependency is detected among the stages. """ all_stage_names = {s.name for s in self._config.stages} dependency_graph = {s.name: s.runs_after for s in self._config.stages} # First, check for dependencies on non-existent stages for stage_name, deps in dependency_graph.items(): for dep_name in deps: if dep_name not in all_stage_names: raise ValueError( f"Stage '{stage_name}' has an invalid dependency: '{dep_name}' is not a defined stage." ) # Second, check for circular dependencies using DFS visiting = set() # For nodes currently in the recursion stack for DFS visited = set() # For nodes that have been completely visited for stage_name in all_stage_names: if stage_name not in visited: self._detect_cycle_util(stage_name, dependency_graph, visiting, visited) def _detect_cycle_util(self, stage_name: str, graph: Dict[str, List[str]], visiting: set, visited: set) -> None: """Utility function to detect cycles using DFS.""" visiting.add(stage_name) for dependency in graph.get(stage_name, []): if dependency in visiting: raise ValueError(f"Circular dependency detected involving stage '{stage_name}' and '{dependency}'.") if dependency not in visited: self._detect_cycle_util(dependency, graph, visiting, visited) visiting.remove(stage_name) visited.add(stage_name)
[docs] def start(self) -> None: """ Starts the underlying RayPipeline, making it ready to process data. Raises ------ RuntimeError If the pipeline has not been built by calling `build()` first. """ if not self._is_built: raise RuntimeError("Pipeline has not been built yet. Call build() before start().") logger.info("Starting the ingestion pipeline...") self._pipeline.start()
[docs] def stop(self) -> None: """ Stops the underlying RayPipeline gracefully. """ if self._pipeline: logger.info("Stopping the ingestion pipeline...") self._pipeline.stop()
[docs] def get_pipeline(self) -> RayPipeline: """ Returns the underlying RayPipeline instance. Returns ------- RayPipeline The raw RayPipeline object. """ return self._pipeline