Source code for nv_ingest.util.pipeline.pipeline_runners

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import atexit
import socket
import os
import json
import logging
import signal
import subprocess
import sys
import threading
import time
from ctypes import c_int, CDLL

from datetime import datetime
from typing import Any, Dict, Optional, TextIO

from morpheus.config import PipelineModes, CppConfig, Config, ExecutionMode
from pydantic import ConfigDict, ValidationError
from pydantic import BaseModel

from nv_ingest.schemas import PipelineConfigSchema
from nv_ingest.util.converters.containers import merge_dict
from morpheus.utils.logger import configure_logging
from nv_ingest.util.pipeline import setup_ingestion_pipeline
from morpheus.pipeline.pipeline import Pipeline

from nv_ingest.util.pipeline.stage_builders import get_default_cpu_count, validate_positive
from nv_ingest.util.schema.schema_validator import validate_schema

logger = logging.getLogger(__name__)


[docs] class PipelineCreationSchema(BaseModel): """ Schema for pipeline creation configuration. Contains all parameters required to set up and execute a Morpheus pipeline, including endpoints, API keys, and processing options. """ # Audio processing settings audio_grpc_endpoint: str = os.getenv("AUDIO_GRPC_ENDPOINT", "grpc.nvcf.nvidia.com:443") audio_function_id: str = os.getenv("AUDIO_FUNCTION_ID", "1598d209-5e27-4d3c-8079-4751568b1081") audio_infer_protocol: str = "grpc" # Embedding model settings embedding_nim_endpoint: str = os.getenv("EMBEDDING_NIM_ENDPOINT", "https://integrate.api.nvidia.com/v1") embedding_nim_model_name: str = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/llama-3.2-nv-embedqa-1b-v2") # General pipeline settings ingest_log_level: str = os.getenv("INGEST_LOG_LEVEL", "INFO") max_ingest_process_workers: str = "16" # Messaging configuration message_client_host: str = "localhost" message_client_port: str = "7671" message_client_type: str = "simple" # Hardware configuration mrc_ignore_numa_check: str = "1" # NeMo Retriever settings nemoretriever_parse_http_endpoint: str = os.getenv( "NEMORETRIEVER_PARSE_HTTP_ENDPOINT", "https://integrate.api.nvidia.com/v1/chat/completions" ) nemoretriever_parse_infer_protocol: str = "http" nemoretriever_parse_model_name: str = os.getenv("NEMORETRIEVER_PARSE_MODEL_NAME", "nvidia/nemoretriever-parse") # API keys ngc_api_key: str = os.getenv("NGC_API_KEY", "") nvidia_build_api_key: str = os.getenv("NVIDIA_BUILD_API_KEY", "") # Observability settings otel_exporter_otlp_endpoint: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") # OCR settings paddle_http_endpoint: str = os.getenv("PADDLE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr") paddle_infer_protocol: str = "http" # Task queue settings redis_morpheus_task_queue: str = "morpheus_task_queue" # Vision language model settings vlm_caption_endpoint: str = os.getenv( "VLM_CAPTION_ENDPOINT", "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-11b-vision-instruct/chat/completions" ) vlm_caption_model_name: str = os.getenv("VLM_CAPTION_MODEL_NAME", "meta/llama-3.2-11b-vision-instruct") # YOLOX model endpoints for various document processing tasks yolox_graphic_elements_http_endpoint: str = os.getenv( "YOLOX_GRAPHIC_ELEMENTS_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1", ) yolox_graphic_elements_infer_protocol: str = "http" yolox_http_endpoint: str = os.getenv( "YOLOX_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2" ) yolox_infer_protocol: str = "http" yolox_table_structure_http_endpoint: str = os.getenv( "YOLOX_TABLE_STRUCTURE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1" ) yolox_table_structure_infer_protocol: str = "http" model_config = ConfigDict(extra="forbid")
def _launch_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float: """ Launches the pipeline setup and runs it synchronously. This function initializes a pipeline with the provided configurations, sets it up, and executes it. It measures and logs timing information at each stage. Parameters ---------- morpheus_pipeline_config : Config The configuration object for the Morpheus pipeline. ingest_config : Dict[str, Any] The ingestion configuration dictionary. Returns ------- float The total time elapsed for pipeline execution in seconds. """ logger.info("Starting pipeline setup") # Initialize the pipeline with the configuration pipe = Pipeline(morpheus_pipeline_config) start_abs = datetime.now() # Set up the ingestion pipeline setup_ingestion_pipeline(pipe, morpheus_pipeline_config, ingest_config) # Record setup time end_setup = start_run = datetime.now() setup_elapsed = (end_setup - start_abs).total_seconds() logger.info(f"Pipeline setup completed in {setup_elapsed:.2f} seconds") # Run the pipeline logger.info("Running pipeline") pipe.run() # Record execution times end_run = datetime.now() run_elapsed = (end_run - start_run).total_seconds() total_elapsed = (end_run - start_abs).total_seconds() logger.info(f"Pipeline run completed in {run_elapsed:.2f} seconds") logger.info(f"Total time elapsed: {total_elapsed:.2f} seconds") return total_elapsed
[docs] def run_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float: """ Runs the pipeline synchronously in the current process. This is the primary entry point for executing a pipeline directly in the current process. Parameters ---------- morpheus_pipeline_config : Config The configuration object for the Morpheus pipeline. ingest_config : Dict[str, Any] The ingestion configuration dictionary. Returns ------- float The total elapsed time for running the pipeline. Raises ------ Exception Any exception raised during pipeline execution will be propagated. """ total_elapsed = _launch_pipeline(morpheus_pipeline_config, ingest_config) logger.debug(f"Pipeline execution completed successfully in {total_elapsed:.2f} seconds.") return total_elapsed
[docs] def run_ingest_pipeline( ingest_config_path: Optional[str] = None, caption_batch_size: int = 8, use_cpp: bool = False, pipeline_batch_size: int = 256, enable_monitor: bool = False, feature_length: int = 512, num_threads: Optional[int] = None, model_max_batch_size: int = 256, mode: str = PipelineModes.NLP.value, log_level: str = "INFO", ) -> None: """ Configures and runs the pipeline with the specified options. This function serves as the main entry point for configuring and executing a pipeline with user-defined settings. Parameters ---------- ingest_config_path : str, optional Path to the JSON configuration file. caption_batch_size : int, optional Number of captions to process in a batch (default: 8). use_cpp : bool, optional Use C++ backend (default: False). pipeline_batch_size : int, optional Batch size for the pipeline (default: 256). enable_monitor : bool, optional Enable monitoring (default: False). feature_length : int, optional Feature length for embeddings (default: 512). num_threads : int, optional Number of threads (default: determined by `get_default_cpu_count`). model_max_batch_size : int, optional Model max batch size (default: 256). mode : str, optional Pipeline mode (default: PipelineModes.NLP.value). log_level : str, optional Log level (default: 'INFO'). Raises ------ ValidationError If the configuration validation fails. """ # Determine number of threads if not specified if num_threads is None: num_threads = get_default_cpu_count() # Validate positive integers validate_positive(None, None, caption_batch_size) # Set up logging level based on environment or parameter log_level_mapping = { "DEBUG": logging.DEBUG, "DEFAULT": logging.INFO, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, } # Check for INGEST_LOG_LEVEL environment variable env_log_level = os.getenv("INGEST_LOG_LEVEL") if env_log_level: log_level = env_log_level if log_level in ("DEFAULT",): log_level = "INFO" log_level_value = log_level_mapping.get(log_level.upper(), logging.INFO) logging.basicConfig(level=log_level_value, format="%(asctime)s - %(levelname)s - %(message)s") configure_logging(log_level=log_level_value) # Configure C++ backend if requested CppConfig.set_should_use_cpp(use_cpp) # Create and configure the Morpheus pipeline morpheus_pipeline_config = Config() morpheus_pipeline_config.debug = True if log_level_value == logging.DEBUG else False morpheus_pipeline_config.log_level = log_level_value morpheus_pipeline_config.pipeline_batch_size = pipeline_batch_size morpheus_pipeline_config.enable_monitor = enable_monitor morpheus_pipeline_config.feature_length = feature_length morpheus_pipeline_config.num_threads = num_threads morpheus_pipeline_config.model_max_batch_size = model_max_batch_size morpheus_pipeline_config.edge_buffer_size = 32 morpheus_pipeline_config.execution_mode = ExecutionMode.CPU morpheus_pipeline_config.mode = PipelineModes[mode.upper()] # Start with empty CLI configuration (future enhancement) cli_ingest_config = {} # TODO: Create a config for overrides -- not necessary yet. # Load configuration from file if provided if ingest_config_path: ingest_config = validate_schema(ingest_config_path) else: ingest_config = {} # Merge options with file configuration final_ingest_config = merge_dict(ingest_config, cli_ingest_config) # Validate final configuration using Pydantic try: validated_config = PipelineConfigSchema(**final_ingest_config) logger.info(f"Configuration loaded and validated: {validated_config}") except ValidationError as e: logger.error(f"Validation error: {e}") raise # Log configurations at debug level logger.debug(f"Ingest Configuration:\n{json.dumps(final_ingest_config, indent=2)}") logger.debug(f"Morpheus configuration:\n{morpheus_pipeline_config}") # Execute the pipeline run_pipeline(morpheus_pipeline_config, final_ingest_config)
def _set_pdeathsig(sig: int = signal.SIGTERM) -> None: """ Sets the parent death signal so that if the parent process dies, the child receives `sig`. This is Linux-specific. This mechanism ensures that child processes are terminated when their parent process is killed, preventing orphaned processes. Parameters ---------- sig : int The signal to be sent to the child process upon parent termination (default: SIGTERM). """ try: libc = CDLL("libc.so.6", use_errno=True) PR_SET_PDEATHSIG = 1 res = libc.prctl(PR_SET_PDEATHSIG, c_int(sig), 0, 0, 0) if res != 0: err = os.strerror(os.get_errno()) logger.error(f"Failed to set PDEATHSIG: {err}") except Exception as e: logger.error(f"Exception in setting PDEATHSIG: {e}") def _terminate_subprocess(process: Optional["subprocess.Popen"] = None) -> None: """ Terminates the pipeline subprocess and its entire process group. Sends SIGTERM followed by SIGKILL if necessary. This function provides a reliable way to clean up all related processes when terminating the main process. Parameters ---------- process : subprocess.Popen or None The subprocess object to terminate. If None, no action is taken. """ if process and process.poll() is None: logger.info("Terminating pipeline subprocess group...") try: # Send SIGTERM to the entire process group os.killpg(os.getpgid(process.pid), signal.SIGTERM) logger.info("Sent SIGTERM to pipeline subprocess group.") # Wait for a short duration to allow graceful termination time.sleep(5) if process.poll() is None: # If still alive, send SIGKILL os.killpg(os.getpgid(process.pid), signal.SIGKILL) logger.info("Sent SIGKILL to pipeline subprocess group.") except Exception as e: logger.error(f"Failed to terminate process group: {e}")
[docs] def is_port_in_use(port, host="127.0.0.1"): """ Checks if a given port is in use on the specified host with socket reuse settings. Parameters: port (int): The port number to check. host (str): The host to check on. Default is '127.0.0.1'. Returns: bool: True if the port is in use, False otherwise. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((host, port)) return False except socket.error: return True
[docs] def start_pipeline_subprocess( config: PipelineCreationSchema, stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None ) -> "subprocess.Popen": """ Launches the pipeline in a subprocess and ensures that it terminates if the parent process dies. This function encapsulates all subprocess-related setup, including signal handling and `atexit` registration. Parameters ---------- config : PipelineCreationSchema Validated pipeline configuration. stdout : file-like object or None, optional File-like object for capturing stdout. If None, output is ignored. stderr : file-like object or None, optional File-like object for capturing stderr. If None, output is ignored. Returns ------- subprocess.Popen The subprocess object for the launched pipeline. """ if is_port_in_use(7671): err_msg = "Port 7671 is already in use. Please stop the service running on this port and try again." logger.error(err_msg) raise Exception(err_msg) # Define the command to invoke the subprocess_entrypoint API function subprocess_command = [ sys.executable, "-c", "from nv_ingest.util.pipeline.pipeline_runners import subprocess_entrypoint; subprocess_entrypoint()", ] # Prepare environment variables from the config env = os.environ.copy() env.update({key.upper(): val for key, val in config.model_dump().items()}) logger.info("Starting pipeline subprocess...") try: # Get current CPU affinity information to respect container limits # but we'll only apply it to the child process try: # Get the current process's CPU affinity - we only need this to know what's available current_affinity = os.sched_getaffinity(0) # 0 means current process # Limit to min(available CPUs from affinity, 8) max_cpus = min(len(current_affinity), 8) # Take the first max_cpus from the current affinity set cpu_set = set(sorted(list(current_affinity))[:max_cpus]) logger.info(f"Current process has access to CPU cores: {current_affinity}") logger.info(f"Child process will be limited to {max_cpus} cores: {cpu_set}") except AttributeError: # sched_getaffinity not available on all platforms logger.warning("os.sched_getaffinity not available, falling back to cpu_count") try: import multiprocessing total_cpus = multiprocessing.cpu_count() max_cpus = min(total_cpus, 8) cpu_set = set(range(max_cpus)) logger.info(f"Child process will be limited to cores 0-{max_cpus-1}") except Exception as e: logger.warning(f"Failed to determine CPU count: {e}. Will not set CPU affinity.") cpu_set = None except Exception as e: logger.warning(f"Failed to get current CPU affinity: {e}. Will not set CPU affinity.") cpu_set = None def combined_preexec_fn(): """Setup function to run in the child process before exec().""" # Start a new session to create a new process group os.setsid() # Set the parent death signal to SIGTERM _set_pdeathsig(signal.SIGTERM) # Set CPU affinity ONLY for the child process if cpu_set is not None: try: # Apply to current process (which becomes the child) # This doesn't affect the parent because we're in the fork+exec # stage and changes here only affect the child os.sched_setaffinity(0, cpu_set) logger.debug(f"Set CPU affinity for subprocess to {cpu_set}") except AttributeError: logger.warning("os.sched_setaffinity not available, using taskset as fallback") # Note: We can't use taskset here as it would need to be applied before Popen except Exception as e: logger.warning(f"Failed to set CPU affinity: {e}") # Configure output redirection stdout_stream = subprocess.DEVNULL if stdout is None else subprocess.PIPE stderr_stream = subprocess.DEVNULL if stderr is None else subprocess.PIPE # Apply taskset as fallback if sched_setaffinity is not available but taskset exists has_taskset = False if cpu_set is not None and not hasattr(os, "sched_setaffinity"): try: # Check if taskset is available on the system taskset_check = subprocess.run( ["which", "taskset"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) has_taskset = taskset_check.returncode == 0 if has_taskset: # Create CPU mask from the specific cores in cpu_set cpu_mask = ",".join(str(cpu) for cpu in sorted(cpu_set)) subprocess_command = ["taskset", "-c", cpu_mask] + subprocess_command logger.info(f"Using taskset to limit to CPU cores {cpu_mask}") else: logger.warning("Neither sched_setaffinity nor taskset are available. CPU affinity will not be set.") except Exception as e: logger.warning(f"Failed to check for taskset: {e}. CPU affinity will not be set.") has_taskset = False # Start the subprocess process = subprocess.Popen( subprocess_command, stdout=stdout_stream, stderr=stderr_stream, text=True, preexec_fn=combined_preexec_fn, env=env, ) logger.debug(f"Pipeline subprocess started with PID: {process.pid}") # Register the atexit handler to terminate the subprocess group on exit atexit.register(_terminate_subprocess, process) # Define and register signal handlers for graceful shutdown def signal_handler(signum, frame): """Handle signals to ensure clean subprocess termination.""" logger.info(f"Received signal {signum}. Terminating pipeline subprocess group...") _terminate_subprocess(process) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Start threads to read stdout and stderr only if user provided handlers if stdout is not None: stdout_thread = threading.Thread( target=_read_stream, args=(process.stdout, "Pipeline STDOUT", stdout), name="StdoutReader", daemon=True, ) stdout_thread.start() if stderr is not None: stderr_thread = threading.Thread( target=_read_stream, args=(process.stderr, "Pipeline STDERR", stderr), name="StderrReader", daemon=True, ) stderr_thread.start() logger.info("Pipeline subprocess started successfully.") return process except Exception as e: logger.error(f"Failed to start pipeline subprocess: {e}") raise
def _read_stream(stream: TextIO, prefix: str, output_stream: TextIO) -> None: """ Reads lines from a subprocess stream (stdout or stderr) and writes them to the provided output stream with a prefix. This function runs in a separate daemon thread to handle output in a non-blocking way. Parameters ---------- stream : TextIO The stream object to read from (subprocess stdout or stderr). prefix : str The prefix to prepend to each line of output. output_stream : TextIO The file-like object where the output should be written. """ try: for line in iter(stream.readline, ""): if line: output_stream.write(f"[{prefix}] {line}") output_stream.flush() except Exception as e: logger.error(f"Error reading {prefix}: {e}") finally: stream.close()
[docs] def subprocess_entrypoint() -> None: """ Entry point for the pipeline subprocess. This function is called when a pipeline subprocess is started. It configures logging and runs the ingest pipeline. Raises ------ Exception Any exception raised during pipeline execution will cause the subprocess to exit with a non-zero status code. """ logger.info("Starting pipeline subprocess...") try: # Run the pipeline - this function blocks until the pipeline is done run_ingest_pipeline() logger.info("Pipeline completed successfully.") except Exception as e: logger.error(f"Pipeline failed: {e}") sys.exit(1) # Exit with a non-zero status code to indicate failure