Source code for nv_ingest.framework.orchestration.morpheus.util.pipeline.pipeline_runners

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import atexit
import socket
import os
import json
import logging
import signal
import subprocess
import sys
import threading
import time
from ctypes import c_int, CDLL

from datetime import datetime
from typing import Any, Dict, Optional, TextIO

from morpheus.config import PipelineModes, CppConfig, Config, ExecutionMode
from pydantic import ConfigDict, ValidationError
from pydantic import BaseModel

from morpheus.utils.logger import configure_logging
from morpheus.pipeline.pipeline import Pipeline

from nv_ingest.framework.orchestration.morpheus.util.pipeline import setup_ingestion_pipeline
from nv_ingest.framework.orchestration.morpheus.util.pipeline.stage_builders import (
    validate_positive,
    get_default_cpu_count,
)
from nv_ingest.framework.schemas.framework_ingest_config_schema import PipelineConfigSchema
from nv_ingest_api.util.converters.containers import merge_dict
from nv_ingest_api.util.schema.schema_validator import validate_schema

logger = logging.getLogger(__name__)


[docs] class PipelineCreationSchema(BaseModel): """ Schema for pipeline creation configuration. Contains all parameters required to set up and execute a Morpheus pipeline, including endpoints, API keys, and processing options. """ # Audio processing settings audio_grpc_endpoint: str = os.getenv("AUDIO_GRPC_ENDPOINT", "grpc.nvcf.nvidia.com:443") audio_function_id: str = os.getenv("AUDIO_FUNCTION_ID", "1598d209-5e27-4d3c-8079-4751568b1081") audio_infer_protocol: str = "grpc" # Embedding model settings embedding_nim_endpoint: str = os.getenv("EMBEDDING_NIM_ENDPOINT", "https://integrate.api.nvidia.com/v1") embedding_nim_model_name: str = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/llama-3.2-nv-embedqa-1b-v2") # General pipeline settings ingest_log_level: str = os.getenv("INGEST_LOG_LEVEL", "INFO") max_ingest_process_workers: str = "16" # Messaging configuration message_client_host: str = "localhost" message_client_port: str = "7671" message_client_type: str = "simple" # Hardware configuration mrc_ignore_numa_check: str = "1" # NeMo Retriever settings nemoretriever_parse_http_endpoint: str = os.getenv( "NEMORETRIEVER_PARSE_HTTP_ENDPOINT", "https://integrate.api.nvidia.com/v1/chat/completions" ) nemoretriever_parse_infer_protocol: str = "http" nemoretriever_parse_model_name: str = os.getenv("NEMORETRIEVER_PARSE_MODEL_NAME", "nvidia/nemoretriever-parse") # API keys ngc_api_key: str = os.getenv("NGC_API_KEY", "") nvidia_build_api_key: str = os.getenv("NVIDIA_BUILD_API_KEY", "") # Observability settings otel_exporter_otlp_endpoint: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") # OCR settings paddle_http_endpoint: str = os.getenv("PADDLE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr") paddle_infer_protocol: str = "http" # Task queue settings redis_morpheus_task_queue: str = "morpheus_task_queue" # Vision language model settings vlm_caption_endpoint: str = os.getenv( "VLM_CAPTION_ENDPOINT", "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-11b-vision-instruct/chat/completions" ) vlm_caption_model_name: str = os.getenv("VLM_CAPTION_MODEL_NAME", "meta/llama-3.2-11b-vision-instruct") # YOLOX model endpoints for various document processing tasks yolox_graphic_elements_http_endpoint: str = os.getenv( "YOLOX_GRAPHIC_ELEMENTS_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1", ) yolox_graphic_elements_infer_protocol: str = "http" yolox_http_endpoint: str = os.getenv( "YOLOX_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2" ) yolox_infer_protocol: str = "http" yolox_table_structure_http_endpoint: str = os.getenv( "YOLOX_TABLE_STRUCTURE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1" ) yolox_table_structure_infer_protocol: str = "http" model_config = ConfigDict(extra="forbid")
def _launch_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float: """ Launches the pipeline setup and runs it synchronously. This function initializes a pipeline with the provided configurations, sets it up, and executes it. It measures and logs timing information at each stage. Parameters ---------- morpheus_pipeline_config : Config The configuration object for the Morpheus pipeline. ingest_config : Dict[str, Any] The ingestion configuration dictionary. Returns ------- float The total time elapsed for pipeline execution in seconds. """ logger.info("Starting pipeline setup") # Initialize the pipeline with the configuration pipe = Pipeline(morpheus_pipeline_config) start_abs = datetime.now() # Set up the ingestion pipeline setup_ingestion_pipeline(pipe, morpheus_pipeline_config, ingest_config) # Record setup time end_setup = start_run = datetime.now() setup_elapsed = (end_setup - start_abs).total_seconds() logger.info(f"Pipeline setup completed in {setup_elapsed:.2f} seconds") # Run the pipeline logger.info("Running pipeline") pipe.run() # Record execution times end_run = datetime.now() run_elapsed = (end_run - start_run).total_seconds() total_elapsed = (end_run - start_abs).total_seconds() logger.info(f"Pipeline run completed in {run_elapsed:.2f} seconds") logger.info(f"Total time elapsed: {total_elapsed:.2f} seconds") return total_elapsed
[docs] def run_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float: """ Runs the pipeline synchronously in the current process. This is the primary entry point for executing a pipeline directly in the current process. Parameters ---------- morpheus_pipeline_config : Config The configuration object for the Morpheus pipeline. ingest_config : Dict[str, Any] The ingestion configuration dictionary. Returns ------- float The total elapsed time for running the pipeline. Raises ------ Exception Any exception raised during pipeline execution will be propagated. """ total_elapsed = _launch_pipeline(morpheus_pipeline_config, ingest_config) logger.debug(f"Pipeline execution completed successfully in {total_elapsed:.2f} seconds.") return total_elapsed
[docs] def run_ingest_pipeline( ingest_config_path: Optional[str] = None, caption_batch_size: int = 8, use_cpp: bool = False, pipeline_batch_size: int = 256, enable_monitor: bool = False, feature_length: int = 512, num_threads: Optional[int] = None, model_max_batch_size: int = 256, mode: str = PipelineModes.NLP.value, log_level: str = "INFO", ) -> None: """ Configures and runs the pipeline with the specified options. This function serves as the main entry point for configuring and executing a pipeline with user-defined settings. Parameters ---------- ingest_config_path : str, optional Path to the JSON configuration file. caption_batch_size : int, optional Number of captions to process in a batch (default: 8). use_cpp : bool, optional Use C++ backend (default: False). pipeline_batch_size : int, optional Batch size for the pipeline (default: 256). enable_monitor : bool, optional Enable monitoring (default: False). feature_length : int, optional Feature length for embeddings (default: 512). num_threads : int, optional Number of threads (default: determined by `get_default_cpu_count`). model_max_batch_size : int, optional Model max batch size (default: 256). mode : str, optional Pipeline mode (default: PipelineModes.NLP.value). log_level : str, optional Log level (default: 'INFO'). Raises ------ ValidationError If the configuration validation fails. """ # Determine number of threads if not specified if num_threads is None: num_threads = get_default_cpu_count() # Validate positive integers validate_positive(None, None, caption_batch_size) # Set up logging level based on environment or parameter log_level_mapping = { "DEBUG": logging.DEBUG, "DEFAULT": logging.INFO, "INFO": logging.INFO, "WARNING": logging.WARNING, "ERROR": logging.ERROR, "CRITICAL": logging.CRITICAL, } # Check for INGEST_LOG_LEVEL environment variable env_log_level = os.getenv("INGEST_LOG_LEVEL") if env_log_level: log_level = env_log_level if log_level in ("DEFAULT",): log_level = "INFO" log_level_value = log_level_mapping.get(log_level.upper(), logging.INFO) logging.basicConfig(level=log_level_value, format="%(asctime)s - %(levelname)s - %(message)s") configure_logging(log_level=log_level_value) # Configure C++ backend if requested CppConfig.set_should_use_cpp(use_cpp) # Create and configure the Morpheus pipeline morpheus_pipeline_config = Config() morpheus_pipeline_config.debug = True if log_level_value == logging.DEBUG else False morpheus_pipeline_config.log_level = log_level_value morpheus_pipeline_config.pipeline_batch_size = pipeline_batch_size morpheus_pipeline_config.enable_monitor = enable_monitor morpheus_pipeline_config.feature_length = feature_length morpheus_pipeline_config.num_threads = num_threads morpheus_pipeline_config.model_max_batch_size = model_max_batch_size morpheus_pipeline_config.edge_buffer_size = 32 morpheus_pipeline_config.execution_mode = ExecutionMode.CPU morpheus_pipeline_config.mode = PipelineModes[mode.upper()] # Start with empty CLI configuration (future enhancement) cli_ingest_config = {} # TODO: Create a config for overrides -- not necessary yet. # Load configuration from file if provided if ingest_config_path: ingest_config = validate_schema(ingest_config_path) else: ingest_config = {} # Merge options with file configuration final_ingest_config = merge_dict(ingest_config, cli_ingest_config) # Validate final configuration using Pydantic try: validated_config = PipelineConfigSchema(**final_ingest_config) logger.info(f"Configuration loaded and validated: {validated_config}") except ValidationError as e: logger.error(f"Validation error: {e}") raise # Log configurations at debug level logger.debug(f"Ingest Configuration:\n{json.dumps(final_ingest_config, indent=2)}") logger.debug(f"Morpheus configuration:\n{morpheus_pipeline_config}") # Execute the pipeline run_pipeline(morpheus_pipeline_config, final_ingest_config)
def _set_pdeathsig(sig: int = signal.SIGTERM) -> None: """ Sets the parent death signal so that if the parent process dies, the child receives `sig`. This is Linux-specific. This mechanism ensures that child processes are terminated when their parent process is killed, preventing orphaned processes. Parameters ---------- sig : int The signal to be sent to the child process upon parent termination (default: SIGTERM). """ try: libc = CDLL("libc.so.6", use_errno=True) PR_SET_PDEATHSIG = 1 res = libc.prctl(PR_SET_PDEATHSIG, c_int(sig), 0, 0, 0) if res != 0: err = os.strerror(os.get_errno()) logger.error(f"Failed to set PDEATHSIG: {err}") except Exception as e: logger.error(f"Exception in setting PDEATHSIG: {e}") def _terminate_subprocess(process: Optional["subprocess.Popen"] = None) -> None: """ Terminates the pipeline subprocess and its entire process group. Sends SIGTERM followed by SIGKILL if necessary. This function provides a reliable way to clean up all related processes when terminating the main process. Parameters ---------- process : subprocess.Popen or None The subprocess object to terminate. If None, no action is taken. """ if process and process.poll() is None: logger.info("Terminating pipeline subprocess group...") try: # Send SIGTERM to the entire process group os.killpg(os.getpgid(process.pid), signal.SIGTERM) logger.info("Sent SIGTERM to pipeline subprocess group.") # Wait for a short duration to allow graceful termination time.sleep(5) if process.poll() is None: # If still alive, send SIGKILL os.killpg(os.getpgid(process.pid), signal.SIGKILL) logger.info("Sent SIGKILL to pipeline subprocess group.") except Exception as e: logger.error(f"Failed to terminate process group: {e}")
[docs] def is_port_in_use(port, host="127.0.0.1"): """ Checks if a given port is in use on the specified host with socket reuse settings. Parameters: port (int): The port number to check. host (str): The host to check on. Default is '127.0.0.1'. Returns: bool: True if the port is in use, False otherwise. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((host, port)) return False except socket.error: return True
[docs] def start_pipeline_subprocess( config: PipelineCreationSchema, stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None ) -> "subprocess.Popen": """ Launches the pipeline in a subprocess and ensures that it terminates if the parent process dies. This function encapsulates all subprocess-related setup, including signal handling and `atexit` registration. Parameters ---------- config : PipelineCreationSchema Validated pipeline configuration. stdout : file-like object or None, optional File-like object for capturing stdout. If None, output is ignored. stderr : file-like object or None, optional File-like object for capturing stderr. If None, output is ignored. Returns ------- subprocess.Popen The subprocess object for the launched pipeline. """ if is_port_in_use(7671): err_msg = "Port 7671 is already in use. Please stop the service running on this port and try again." logger.error(err_msg) raise Exception(err_msg) # Define the command to invoke the subprocess_entrypoint API function subprocess_command = [ sys.executable, "-c", "from nv_ingest.framework.orchestration.morpheus.util.pipeline.pipeline_runners import subprocess_entrypoint;" "subprocess_entrypoint()", ] # Prepare environment variables from the config env = os.environ.copy() env.update({key.upper(): val for key, val in config.model_dump().items()}) logger.info("Starting pipeline subprocess...") try: # Get current CPU affinity information to respect container limits # but we'll only apply it to the child process try: # Get the current process's CPU affinity - we only need this to know what's available current_affinity = os.sched_getaffinity(0) # 0 means current process # Limit to min(available CPUs from affinity, 8) max_cpus = min(len(current_affinity), 8) # Take the first max_cpus from the current affinity set cpu_set = set(sorted(list(current_affinity))[:max_cpus]) logger.info(f"Current process has access to CPU cores: {current_affinity}") logger.info(f"Child process will be limited to {max_cpus} cores: {cpu_set}") except AttributeError: # sched_getaffinity not available on all platforms logger.warning("os.sched_getaffinity not available, falling back to cpu_count") try: import multiprocessing total_cpus = multiprocessing.cpu_count() max_cpus = min(total_cpus, 8) cpu_set = set(range(max_cpus)) logger.info(f"Child process will be limited to cores 0-{max_cpus-1}") except Exception as e: logger.warning(f"Failed to determine CPU count: {e}. Will not set CPU affinity.") cpu_set = None except Exception as e: logger.warning(f"Failed to get current CPU affinity: {e}. Will not set CPU affinity.") cpu_set = None def combined_preexec_fn(): """Setup function to run in the child process before exec().""" # Start a new session to create a new process group os.setsid() # Set the parent death signal to SIGTERM _set_pdeathsig(signal.SIGTERM) # Set CPU affinity ONLY for the child process if cpu_set is not None: try: # Apply to current process (which becomes the child) # This doesn't affect the parent because we're in the fork+exec # stage and changes here only affect the child os.sched_setaffinity(0, cpu_set) logger.debug(f"Set CPU affinity for subprocess to {cpu_set}") except AttributeError: logger.warning("os.sched_setaffinity not available, using taskset as fallback") # Note: We can't use taskset here as it would need to be applied before Popen except Exception as e: logger.warning(f"Failed to set CPU affinity: {e}") # Configure output redirection stdout_stream = subprocess.DEVNULL if stdout is None else subprocess.PIPE stderr_stream = subprocess.DEVNULL if stderr is None else subprocess.PIPE # Apply taskset as fallback if sched_setaffinity is not available but taskset exists has_taskset = False if cpu_set is not None and not hasattr(os, "sched_setaffinity"): try: # Check if taskset is available on the system taskset_check = subprocess.run( ["which", "taskset"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) has_taskset = taskset_check.returncode == 0 if has_taskset: # Create CPU mask from the specific cores in cpu_set cpu_mask = ",".join(str(cpu) for cpu in sorted(cpu_set)) subprocess_command = ["taskset", "-c", cpu_mask] + subprocess_command logger.info(f"Using taskset to limit to CPU cores {cpu_mask}") else: logger.warning("Neither sched_setaffinity nor taskset are available. CPU affinity will not be set.") except Exception as e: logger.warning(f"Failed to check for taskset: {e}. CPU affinity will not be set.") has_taskset = False # Start the subprocess process = subprocess.Popen( subprocess_command, stdout=stdout_stream, stderr=stderr_stream, text=True, preexec_fn=combined_preexec_fn, env=env, ) logger.debug(f"Pipeline subprocess started with PID: {process.pid}") # Register the atexit handler to terminate the subprocess group on exit atexit.register(_terminate_subprocess, process) # Define and register signal handlers for graceful shutdown def signal_handler(signum, frame): """Handle signals to ensure clean subprocess termination.""" logger.info(f"Received signal {signum}. Terminating pipeline subprocess group...") _terminate_subprocess(process) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Start threads to read stdout and stderr only if user provided handlers if stdout is not None: stdout_thread = threading.Thread( target=_read_stream, args=(process.stdout, "Pipeline STDOUT", stdout), name="StdoutReader", daemon=True, ) stdout_thread.start() if stderr is not None: stderr_thread = threading.Thread( target=_read_stream, args=(process.stderr, "Pipeline STDERR", stderr), name="StderrReader", daemon=True, ) stderr_thread.start() logger.info("Pipeline subprocess started successfully.") return process except Exception as e: logger.error(f"Failed to start pipeline subprocess: {e}") raise
def _read_stream(stream: TextIO, prefix: str, output_stream: TextIO) -> None: """ Reads lines from a subprocess stream (stdout or stderr) and writes them to the provided output stream with a prefix. This function runs in a separate daemon thread to handle output in a non-blocking way. Parameters ---------- stream : TextIO The stream object to read from (subprocess stdout or stderr). prefix : str The prefix to prepend to each line of output. output_stream : TextIO The file-like object where the output should be written. """ try: for line in iter(stream.readline, ""): if line: output_stream.write(f"[{prefix}] {line}") output_stream.flush() except Exception as e: logger.error(f"Error reading {prefix}: {e}") finally: stream.close()
[docs] def subprocess_entrypoint() -> None: """ Entry point for the pipeline subprocess. This function is called when a pipeline subprocess is started. It configures logging and runs the ingest pipeline. Raises ------ Exception Any exception raised during pipeline execution will cause the subprocess to exit with a non-zero status code. """ logger.info("Starting pipeline subprocess...") try: # Run the pipeline - this function blocks until the pipeline is done run_ingest_pipeline() logger.info("Pipeline completed successfully.") except Exception as e: logger.error(f"Pipeline failed: {e}") sys.exit(1) # Exit with a non-zero status code to indicate failure