Source code for nv_ingest.pipeline.config.loaders
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Configuration loading and management functions for pipeline execution.
This module provides declarative functions for loading, validating, and applying
runtime overrides to pipeline configurations, replacing imperative inline logic.
"""
import logging
import yaml
from typing import Optional
from nv_ingest.pipeline.pipeline_schema import PipelineConfigSchema
from nv_ingest.pipeline.default_libmode_pipeline_impl import DEFAULT_LIBMODE_PIPELINE_YAML
from nv_ingest.pipeline.default_pipeline_impl import DEFAULT_PIPELINE_YAML
from nv_ingest.framework.orchestration.execution.options import PipelineRuntimeOverrides
from nv_ingest_api.util.string_processing.yaml import substitute_env_vars_in_yaml_content
logger = logging.getLogger(__name__)
[docs]
def load_pipeline_config(config_path: str) -> PipelineConfigSchema:
"""
Load a pipeline configuration file, substituting environment variables.
Parameters
----------
config_path : str
The path to the YAML configuration file.
Returns
-------
PipelineConfigSchema
A validated PipelineConfigSchema object.
Raises
------
ValueError
If the YAML file cannot be parsed after environment variable substitution.
"""
logger.info(f"Loading pipeline configuration from: {config_path}")
# Read the raw YAML file content
with open(config_path, "r") as f:
raw_content = f.read()
# Substitute all environment variable placeholders using the utility function
substituted_content = substitute_env_vars_in_yaml_content(raw_content)
# Parse the substituted content with PyYAML, with error handling
try:
processed_config = yaml.safe_load(substituted_content)
except yaml.YAMLError as e:
error_message = (
f"Failed to parse YAML after environment variable substitution. "
f"Error: {e}\n\n"
f"--- Substituted Content ---\n{substituted_content}\n---------------------------"
)
raise ValueError(error_message) from e
# Pydantic validates the clean, substituted data against the schema
return PipelineConfigSchema(**processed_config)
[docs]
def load_default_pipeline_config() -> PipelineConfigSchema:
"""
Load and validate the embedded default (non-libmode) pipeline configuration.
Returns
-------
PipelineConfigSchema
Validated default pipeline configuration.
Raises
------
ValueError
If the default YAML cannot be parsed or validated.
"""
logger.info("Loading embedded default pipeline configuration")
substituted_content = substitute_env_vars_in_yaml_content(DEFAULT_PIPELINE_YAML)
try:
processed_config = yaml.safe_load(substituted_content)
except yaml.YAMLError as e:
error_message = (
f"Failed to parse embedded default pipeline YAML after environment variable substitution. Error: {e}"
)
raise ValueError(error_message) from e
return PipelineConfigSchema(**processed_config)
[docs]
def load_default_libmode_config() -> PipelineConfigSchema:
"""
Load and validate the default libmode pipeline configuration.
This function loads the embedded default libmode pipeline YAML,
performs environment variable substitution, and returns a validated
configuration object.
Returns
-------
PipelineConfigSchema
Validated default libmode pipeline configuration.
Raises
------
ValueError
If the default YAML cannot be parsed or validated.
"""
logger.info("Loading default libmode pipeline configuration")
# Substitute environment variables in the YAML content
substituted_content = substitute_env_vars_in_yaml_content(DEFAULT_LIBMODE_PIPELINE_YAML)
# Parse the substituted content with PyYAML
try:
processed_config = yaml.safe_load(substituted_content)
except yaml.YAMLError as e:
error_message = (
f"Failed to parse default libmode pipeline YAML after environment variable substitution. " f"Error: {e}"
)
raise ValueError(error_message) from e
# Create and return validated PipelineConfigSchema
return PipelineConfigSchema(**processed_config)
[docs]
def apply_runtime_overrides(config: PipelineConfigSchema, overrides: PipelineRuntimeOverrides) -> PipelineConfigSchema:
"""
Apply runtime parameter overrides to a pipeline configuration.
This function creates a copy of the provided configuration and applies
any non-None override values to the pipeline runtime settings.
Parameters
----------
config : PipelineConfigSchema
Base pipeline configuration to modify.
overrides : PipelineRuntimeOverrides
Runtime overrides to apply. Only non-None values are applied.
Returns
-------
PipelineConfigSchema
Modified configuration with overrides applied.
"""
# Create a copy to avoid modifying the original
modified_config = config.model_copy(deep=True)
# Apply overrides if provided
if overrides.disable_dynamic_scaling is not None:
modified_config.pipeline.disable_dynamic_scaling = overrides.disable_dynamic_scaling
logger.debug(f"Applied dynamic scaling override: {overrides.disable_dynamic_scaling}")
if overrides.dynamic_memory_threshold is not None:
modified_config.pipeline.dynamic_memory_threshold = overrides.dynamic_memory_threshold
logger.debug(f"Applied memory threshold override: {overrides.dynamic_memory_threshold}")
return modified_config
[docs]
def validate_pipeline_config(config: Optional[PipelineConfigSchema]) -> PipelineConfigSchema:
"""
Validate and ensure a pipeline configuration is available.
This function ensures that a valid pipeline configuration is available,
either from the provided config or by loading the default libmode config.
Parameters
----------
config : Optional[PipelineConfigSchema]
Pipeline configuration to validate, or None to load default.
Returns
-------
PipelineConfigSchema
Validated pipeline configuration.
Raises
------
ValueError
If config is None and default config cannot be loaded.
"""
if config is None:
return load_default_libmode_config()
# Config is already validated by Pydantic, just return it
return config
[docs]
def resolve_pipeline_config(provided_config: Optional[PipelineConfigSchema], libmode: bool) -> PipelineConfigSchema:
"""
Resolve the final pipeline configuration from inputs.
This function implements the configuration resolution logic:
- If config provided: use it
- If libmode=True and no config: load default libmode config
- If libmode=False and no config: raise error
Parameters
----------
provided_config : Optional[PipelineConfigSchema]
User-provided pipeline configuration, or None.
libmode : bool
Whether to allow loading default libmode configuration.
Returns
-------
PipelineConfigSchema
Resolved and validated pipeline configuration.
Raises
------
ValueError
If no config provided and libmode=False.
"""
if provided_config is not None:
return provided_config
if libmode:
return load_default_libmode_config()
else:
# For non-libmode, fall back to embedded default pipeline implementation
return load_default_pipeline_config()