# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Low-level pipeline execution functions.
This module contains the core pipeline execution functions that are shared
between different execution strategies, extracted to avoid circular imports.
"""
import logging
import multiprocessing
import os
import signal
import sys
import time
from ctypes import CDLL
from datetime import datetime
from typing import Union, Tuple, Optional, TextIO, Any
import json
import ray
from ray import LoggingConfig
from nv_ingest.framework.orchestration.process.dependent_services import start_simple_message_broker
from nv_ingest.framework.orchestration.process.termination import (
kill_pipeline_process_group as _kill_pipeline_process_group,
)
from nv_ingest.pipeline.ingest_pipeline import IngestPipelineBuilder
from nv_ingest.pipeline.pipeline_schema import PipelineConfigSchema
from nv_ingest.pipeline.config.replica_resolver import resolve_static_replicas
from nv_ingest_api.util.string_processing.configuration import pretty_print_pipeline_config
logger = logging.getLogger(__name__)
def _safe_log(level: int, msg: str) -> None:
"""Best-effort logging that won't crash during interpreter shutdown.
Attempts to emit via the module logger, but if logging handlers/streams
have already been closed (common in atexit during CI/pytest teardown),
falls back to writing to sys.__stderr__ and never raises.
"""
try:
logger.log(level, msg)
return
except Exception:
pass
try:
# Use the original un-captured stderr if available
if hasattr(sys, "__stderr__") and sys.__stderr__:
sys.__stderr__.write(msg + "\n")
sys.__stderr__.flush()
except Exception:
# Last resort: swallow any error to avoid noisy shutdowns
pass
[docs]
def str_to_bool(value: str) -> bool:
"""Convert string to boolean value."""
return value.strip().lower() in {"1", "true", "yes", "on"}
[docs]
def redirect_os_fds(stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None):
"""
Redirect OS-level stdout (fd=1) and stderr (fd=2) to the given file-like objects,
or to /dev/null if not provided.
Parameters
----------
stdout : Optional[TextIO]
Stream to receive OS-level stdout. If None, redirected to /dev/null.
stderr : Optional[TextIO]
Stream to receive OS-level stderr. If None, redirected to /dev/null.
"""
import os
# Get file descriptors for stdout and stderr, or use /dev/null
stdout_fd = stdout.fileno() if stdout else os.open(os.devnull, os.O_WRONLY)
stderr_fd = stderr.fileno() if stderr else os.open(os.devnull, os.O_WRONLY)
# Redirect OS-level file descriptors
os.dup2(stdout_fd, 1) # Redirect stdout (fd=1)
os.dup2(stderr_fd, 2) # Redirect stderr (fd=2)
[docs]
def set_pdeathsig(sig=signal.SIGKILL):
"""Set parent death signal to kill child when parent dies."""
libc = CDLL("libc.so.6")
libc.prctl(1, sig) # PR_SET_PDEATHSIG = 1
[docs]
def build_logging_config_from_env() -> LoggingConfig:
"""
Build Ray LoggingConfig from environment variables.
Package-level preset (sets all defaults):
- INGEST_RAY_LOG_LEVEL: PRODUCTION, DEVELOPMENT, DEBUG. Default: DEVELOPMENT
Individual environment variables (override preset defaults):
- RAY_LOGGING_LEVEL: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: INFO
- RAY_LOGGING_ENCODING: Log encoding format (TEXT, JSON). Default: TEXT
- RAY_LOGGING_ADDITIONAL_ATTRS: Comma-separated list of additional standard logger attributes
- RAY_DEDUP_LOGS: Enable/disable log deduplication (0/1). Default: 1 (enabled)
- RAY_LOG_TO_DRIVER: Enable/disable logging to driver (true/false). Default: true
- RAY_LOGGING_ROTATE_BYTES: Maximum log file size before rotation (bytes). Default: 1GB
- RAY_LOGGING_ROTATE_BACKUP_COUNT: Number of backup log files to keep. Default: 19
- RAY_DISABLE_IMPORT_WARNING: Disable Ray import warnings (0/1). Default: 0
- RAY_USAGE_STATS_ENABLED: Enable/disable usage stats collection (0/1). Default: 1
"""
# Apply package-level preset defaults first
preset_level = os.environ.get("INGEST_RAY_LOG_LEVEL", "DEVELOPMENT").upper()
# Define preset configurations
presets = {
"PRODUCTION": {
"RAY_LOGGING_LEVEL": "ERROR",
"RAY_LOGGING_ENCODING": "TEXT",
"RAY_LOGGING_ADDITIONAL_ATTRS": "",
"RAY_DEDUP_LOGS": "1",
"RAY_LOG_TO_DRIVER": "0", # false
"RAY_LOGGING_ROTATE_BYTES": "1073741824", # 1GB
"RAY_LOGGING_ROTATE_BACKUP_COUNT": "9", # 10GB total
"RAY_DISABLE_IMPORT_WARNING": "1",
"RAY_USAGE_STATS_ENABLED": "0",
},
"DEVELOPMENT": {
"RAY_LOGGING_LEVEL": "INFO",
"RAY_LOGGING_ENCODING": "TEXT",
"RAY_LOGGING_ADDITIONAL_ATTRS": "",
"RAY_DEDUP_LOGS": "1",
"RAY_LOG_TO_DRIVER": "0", # false
"RAY_LOGGING_ROTATE_BYTES": "1073741824", # 1GB
"RAY_LOGGING_ROTATE_BACKUP_COUNT": "19", # 20GB total
"RAY_DISABLE_IMPORT_WARNING": "0",
"RAY_USAGE_STATS_ENABLED": "1",
},
"DEBUG": {
"RAY_LOGGING_LEVEL": "DEBUG",
"RAY_LOGGING_ENCODING": "JSON",
"RAY_LOGGING_ADDITIONAL_ATTRS": "name,funcName,lineno",
"RAY_DEDUP_LOGS": "0",
"RAY_LOG_TO_DRIVER": "0", # false
"RAY_LOGGING_ROTATE_BYTES": "536870912", # 512MB
"RAY_LOGGING_ROTATE_BACKUP_COUNT": "39", # 20GB total
"RAY_DISABLE_IMPORT_WARNING": "0",
"RAY_USAGE_STATS_ENABLED": "1",
},
}
# Validate preset level
if preset_level not in presets:
logger.warning(
f"Invalid INGEST_RAY_LOG_LEVEL '{preset_level}', using DEVELOPMENT. "
f"Valid presets: {list(presets.keys())}"
)
preset_level = "DEVELOPMENT"
# Apply preset defaults (only if env var not already set)
preset_config = presets[preset_level]
for key, default_value in preset_config.items():
if key not in os.environ:
os.environ[key] = default_value
# For PRODUCTION mode, also suppress nv-ingest module INFO logs
if preset_level == "PRODUCTION":
logging.getLogger("nv_ingest").setLevel(logging.WARNING)
logging.getLogger("nv_ingest_api").setLevel(logging.WARNING)
logger.info(f"Applied Ray logging preset: {preset_level}")
# Get log level from environment, default to INFO
log_level = os.environ.get("RAY_LOGGING_LEVEL", "INFO").upper()
# Validate log level
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if log_level not in valid_levels:
logger.warning(f"Invalid RAY_LOGGING_LEVEL '{log_level}', using INFO. Valid levels: {valid_levels}")
log_level = "INFO"
# Get encoding format from environment, default to TEXT
encoding = os.environ.get("RAY_LOGGING_ENCODING", "TEXT").upper()
# Validate encoding
valid_encodings = ["TEXT", "JSON"]
if encoding not in valid_encodings:
logger.warning(f"Invalid RAY_LOGGING_ENCODING '{encoding}', using TEXT. Valid encodings: {valid_encodings}")
encoding = "TEXT"
# Get additional standard logger attributes
additional_attrs_str = os.environ.get("RAY_LOGGING_ADDITIONAL_ATTRS", "")
additional_log_standard_attrs = []
if additional_attrs_str:
additional_log_standard_attrs = [attr.strip() for attr in additional_attrs_str.split(",") if attr.strip()]
# Set log deduplication environment variable if specified
dedup_logs = os.environ.get("RAY_DEDUP_LOGS", "1")
if dedup_logs is not None:
os.environ["RAY_DEDUP_LOGS"] = str(dedup_logs)
# Set log to driver environment variable if specified
log_to_driver = os.environ.get("RAY_LOG_TO_DRIVER", "0")
if log_to_driver is not None:
os.environ["RAY_LOG_TO_DRIVER"] = str(log_to_driver)
# Configure log rotation settings
rotate_bytes = os.environ.get("RAY_LOGGING_ROTATE_BYTES", "1073741824") # Default: 1GB per file
if rotate_bytes is not None:
try:
rotate_bytes_int = int(rotate_bytes)
os.environ["RAY_LOGGING_ROTATE_BYTES"] = str(rotate_bytes_int)
except ValueError:
logger.warning(f"Invalid RAY_LOGGING_ROTATE_BYTES '{rotate_bytes}', using default (1GB)")
os.environ["RAY_LOGGING_ROTATE_BYTES"] = "1073741824"
rotate_backup_count = os.environ.get("RAY_LOGGING_ROTATE_BACKUP_COUNT", "19") # Default: 19 backups (20GB Max)
if rotate_backup_count is not None:
try:
backup_count_int = int(rotate_backup_count)
os.environ["RAY_LOGGING_ROTATE_BACKUP_COUNT"] = str(backup_count_int)
except ValueError:
logger.warning(f"Invalid RAY_LOGGING_ROTATE_BACKUP_COUNT '{rotate_backup_count}', using default (19)")
os.environ["RAY_LOGGING_ROTATE_BACKUP_COUNT"] = "19"
# Configure Ray internal logging verbosity
disable_import_warning = os.environ.get("RAY_DISABLE_IMPORT_WARNING", "0")
if disable_import_warning is not None:
os.environ["RAY_DISABLE_IMPORT_WARNING"] = str(disable_import_warning)
# Configure usage stats collection
usage_stats_enabled = os.environ.get("RAY_USAGE_STATS_ENABLED", "1")
if usage_stats_enabled is not None:
os.environ["RAY_USAGE_STATS_ENABLED"] = str(usage_stats_enabled)
# Create LoggingConfig with validated parameters
logging_config = LoggingConfig(
encoding=encoding,
log_level=log_level,
additional_log_standard_attrs=additional_log_standard_attrs,
)
logger.info(
f"Ray logging configured: preset={preset_level}, level={log_level}, encoding={encoding}, "
f"additional_attrs={additional_log_standard_attrs}, "
f"dedup_logs={os.environ.get('RAY_DEDUP_LOGS', '1')}, "
f"log_to_driver={os.environ.get('RAY_LOG_TO_DRIVER', '0')}, "
f"rotate_bytes={os.environ.get('RAY_LOGGING_ROTATE_BYTES', '1073741824')}, "
f"rotate_backup_count={os.environ.get('RAY_LOGGING_ROTATE_BACKUP_COUNT', '19')}"
)
return logging_config
[docs]
def launch_pipeline(
pipeline_config: PipelineConfigSchema,
block: bool = True,
disable_dynamic_scaling: Optional[bool] = None,
dynamic_memory_threshold: Optional[float] = None,
) -> Tuple[Union[Any, None], Optional[float]]:
"""
Launch a pipeline using the provided configuration.
This function handles the core pipeline launching logic including Ray
initialization, pipeline building, and execution loop.
Parameters
----------
pipeline_config : PipelineConfigSchema
Validated pipeline configuration to execute.
block : bool, optional
Whether to block until pipeline completes, by default True.
disable_dynamic_scaling : Optional[bool], optional
Override for dynamic scaling behavior, by default None.
dynamic_memory_threshold : Optional[float], optional
Override for memory threshold, by default None.
Returns
-------
Tuple[Union[Any, None], Optional[float]]
Raw pipeline object (type elided to avoid circular import) and elapsed time. For blocking execution,
returns (None, elapsed_time). For non-blocking, returns (pipeline, None).
"""
logger.info("Starting pipeline setup")
# Initialize Ray if not already initialized
if not ray.is_initialized():
# Build Ray logging configuration
logging_config = build_logging_config_from_env()
# Clear existing handlers from root logger before Ray adds its handler
# This prevents duplicate logging caused by multiple handlers on the root logger
root_logger = logging.getLogger()
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
logger.info("Cleared existing root logger handlers to prevent Ray logging duplicates")
ray.init(
namespace="nv_ingest_ray",
ignore_reinit_error=True,
dashboard_host="0.0.0.0",
dashboard_port=8265,
logging_config=logging_config, # Ray will add its own StreamHandler
_system_config={
"local_fs_capacity_threshold": 0.9,
"object_spilling_config": json.dumps(
{
"type": "filesystem",
"params": {
"directory_path": [
"/tmp/ray_spill_testing_0",
"/tmp/ray_spill_testing_1",
"/tmp/ray_spill_testing_2",
"/tmp/ray_spill_testing_3",
],
"buffer_size": 100_000_000,
},
},
),
},
)
# Handle disable_dynamic_scaling parameter override
if disable_dynamic_scaling and not pipeline_config.pipeline.disable_dynamic_scaling:
# Directly modify the pipeline config to disable dynamic scaling
pipeline_config.pipeline.disable_dynamic_scaling = True
logger.info("Dynamic scaling disabled via function parameter override")
# Resolve static replicas
pipeline_config = resolve_static_replicas(pipeline_config)
# Pretty print the final pipeline configuration (after replica resolution)
# INFO level so it shows in docker/helm deployments; quiet mode suppresses in library mode
pretty_output = pretty_print_pipeline_config(pipeline_config, config_path=None)
logger.info("\n" + pretty_output)
# Set up the ingestion pipeline
start_abs = datetime.now()
ingest_pipeline = None
try:
ingest_pipeline = IngestPipelineBuilder(pipeline_config)
ingest_pipeline.build()
# Record setup time
end_setup = start_run = datetime.now()
setup_time = (end_setup - start_abs).total_seconds()
logger.info(f"Pipeline setup complete in {setup_time:.2f} seconds")
# Run the pipeline
logger.debug("Running pipeline")
ingest_pipeline.start()
except Exception as e:
# Ensure any partial startup is torn down
logger.error(f"Pipeline startup failed, initiating cleanup: {e}", exc_info=True)
try:
if ingest_pipeline is not None:
try:
ingest_pipeline.stop()
except Exception:
pass
finally:
try:
if ray.is_initialized():
ray.shutdown()
logger.info("Ray shutdown complete after startup failure.")
finally:
pass
# Re-raise to surface failure to caller
raise
if block:
try:
# Block indefinitely until a KeyboardInterrupt is received
while True:
time.sleep(5)
except KeyboardInterrupt:
logger.info("Interrupt received, shutting down pipeline.")
ingest_pipeline.stop()
ray.shutdown()
logger.info("Ray shutdown complete.")
except Exception as e:
logger.error(f"Unexpected error during pipeline run: {e}", exc_info=True)
try:
ingest_pipeline.stop()
finally:
if ray.is_initialized():
ray.shutdown()
raise
# Record execution times
end_run = datetime.now()
run_time = (end_run - start_run).total_seconds()
total_elapsed = (end_run - start_abs).total_seconds()
logger.info(f"Pipeline execution time: {run_time:.2f} seconds")
logger.info(f"Total time elapsed: {total_elapsed:.2f} seconds")
return None, total_elapsed
else:
# Non-blocking - return the pipeline interface
# Access the internal RayPipeline from IngestPipelineBuilder
return ingest_pipeline._pipeline, None
[docs]
def run_pipeline_process(
pipeline_config: PipelineConfigSchema,
stdout: Optional[TextIO] = None,
stderr: Optional[TextIO] = None,
) -> None:
"""
Entry point for running a pipeline in a subprocess.
This function is designed to be the target of a multiprocessing.Process,
handling output redirection and process group management.
Parameters
----------
pipeline_config : PipelineConfigSchema
Pipeline configuration object.
stdout : Optional[TextIO], optional
Output stream for subprocess stdout, by default None.
stderr : Optional[TextIO], optional
Error stream for subprocess stderr, by default None.
"""
# Set up output redirection
if stdout:
sys.stdout = stdout
if stderr:
sys.stderr = stderr
# Ensure the subprocess is killed if the parent dies to avoid hangs
try:
set_pdeathsig(signal.SIGKILL)
except Exception as e:
logger.debug(f"set_pdeathsig not available or failed: {e}")
# Create a new process group so we can terminate the entire subtree cleanly
try:
os.setpgrp()
except Exception as e:
logger.debug(f"os.setpgrp() not available or failed: {e}")
# Install signal handlers for graceful shutdown in the subprocess
def _handle_signal(signum, frame):
try:
_safe_log(logging.INFO, f"Received signal {signum}; shutting down Ray and exiting...")
if ray.is_initialized():
ray.shutdown()
finally:
# Exit immediately after best-effort cleanup
os._exit(0)
try:
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
except Exception as e:
logger.debug(f"Signal handlers not set: {e}")
# Test output redirection
print("DEBUG: Direct print to stdout - should appear in parent process")
sys.stderr.write("DEBUG: Direct write to stderr - should appear in parent process\n")
# Test logging output
logger.info("DEBUG: Logger info - may not appear if logging handlers not redirected")
# If requested, start the simple broker inside this subprocess so it shares the process group
broker_proc = None
try:
if os.environ.get("NV_INGEST_BROKER_IN_SUBPROCESS") == "1":
try:
# Only launch if the config requests it
if getattr(pipeline_config, "pipeline", None) and getattr(
pipeline_config.pipeline, "launch_simple_broker", False
):
_safe_log(logging.INFO, "Starting SimpleMessageBroker inside subprocess")
broker_proc = start_simple_message_broker({})
except Exception as e:
_safe_log(logging.ERROR, f"Failed to start SimpleMessageBroker in subprocess: {e}")
# Continue without broker; launch will fail fast if required
# Launch the pipeline (blocking)
launch_pipeline(pipeline_config, block=True)
except Exception as e:
logger.error(f"Subprocess pipeline execution failed: {e}")
raise
finally:
# Best-effort: if we created a broker here and the pipeline exits normally,
# attempt a graceful terminate. In failure/termination paths the process group kill
# from parent or signal handler will take care of it.
if broker_proc is not None:
try:
if hasattr(broker_proc, "is_alive") and broker_proc.is_alive():
broker_proc.terminate()
except Exception:
pass
[docs]
def kill_pipeline_process_group(process: multiprocessing.Process) -> None:
"""Backward-compatible shim that delegates to process.termination implementation."""
_safe_log(logging.DEBUG, "Delegating kill_pipeline_process_group to process.termination module")
_kill_pipeline_process_group(process)