# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import atexit
import socket
import os
import json
import logging
import signal
import subprocess
import sys
import threading
import time
from ctypes import c_int, CDLL
from datetime import datetime
from typing import Any, Dict, Optional, TextIO
from morpheus.config import PipelineModes, CppConfig, Config, ExecutionMode
from pydantic import ConfigDict, ValidationError
from pydantic import BaseModel
from morpheus.utils.logger import configure_logging
from morpheus.pipeline.pipeline import Pipeline
from nv_ingest.framework.orchestration.morpheus.util.pipeline import setup_ingestion_pipeline
from nv_ingest.framework.orchestration.morpheus.util.pipeline.stage_builders import (
validate_positive,
get_default_cpu_count,
)
from nv_ingest.framework.schemas.framework_ingest_config_schema import PipelineConfigSchema
from nv_ingest_api.util.converters.containers import merge_dict
from nv_ingest_api.util.schema.schema_validator import validate_schema
logger = logging.getLogger(__name__)
[docs]
class PipelineCreationSchema(BaseModel):
"""
Schema for pipeline creation configuration.
Contains all parameters required to set up and execute a Morpheus pipeline,
including endpoints, API keys, and processing options.
"""
# Audio processing settings
audio_grpc_endpoint: str = os.getenv("AUDIO_GRPC_ENDPOINT", "grpc.nvcf.nvidia.com:443")
audio_function_id: str = os.getenv("AUDIO_FUNCTION_ID", "1598d209-5e27-4d3c-8079-4751568b1081")
audio_infer_protocol: str = "grpc"
# Embedding model settings
embedding_nim_endpoint: str = os.getenv("EMBEDDING_NIM_ENDPOINT", "https://integrate.api.nvidia.com/v1")
embedding_nim_model_name: str = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/llama-3.2-nv-embedqa-1b-v2")
# General pipeline settings
ingest_log_level: str = os.getenv("INGEST_LOG_LEVEL", "INFO")
max_ingest_process_workers: str = "16"
# Messaging configuration
message_client_host: str = "localhost"
message_client_port: str = "7671"
message_client_type: str = "simple"
# Hardware configuration
mrc_ignore_numa_check: str = "1"
# NeMo Retriever settings
nemoretriever_parse_http_endpoint: str = os.getenv(
"NEMORETRIEVER_PARSE_HTTP_ENDPOINT", "https://integrate.api.nvidia.com/v1/chat/completions"
)
nemoretriever_parse_infer_protocol: str = "http"
nemoretriever_parse_model_name: str = os.getenv("NEMORETRIEVER_PARSE_MODEL_NAME", "nvidia/nemoretriever-parse")
# API keys
ngc_api_key: str = os.getenv("NGC_API_KEY", "")
nvidia_build_api_key: str = os.getenv("NVIDIA_BUILD_API_KEY", "")
# Observability settings
otel_exporter_otlp_endpoint: str = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")
# OCR settings
paddle_http_endpoint: str = os.getenv("PADDLE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr")
paddle_infer_protocol: str = "http"
# Task queue settings
redis_morpheus_task_queue: str = "morpheus_task_queue"
# Vision language model settings
vlm_caption_endpoint: str = os.getenv(
"VLM_CAPTION_ENDPOINT", "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-11b-vision-instruct/chat/completions"
)
vlm_caption_model_name: str = os.getenv("VLM_CAPTION_MODEL_NAME", "meta/llama-3.2-11b-vision-instruct")
# YOLOX model endpoints for various document processing tasks
yolox_graphic_elements_http_endpoint: str = os.getenv(
"YOLOX_GRAPHIC_ELEMENTS_HTTP_ENDPOINT",
"https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-graphic-elements-v1",
)
yolox_graphic_elements_infer_protocol: str = "http"
yolox_http_endpoint: str = os.getenv(
"YOLOX_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-page-elements-v2"
)
yolox_infer_protocol: str = "http"
yolox_table_structure_http_endpoint: str = os.getenv(
"YOLOX_TABLE_STRUCTURE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nemoretriever-table-structure-v1"
)
yolox_table_structure_infer_protocol: str = "http"
model_config = ConfigDict(extra="forbid")
def _launch_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float:
"""
Launches the pipeline setup and runs it synchronously.
This function initializes a pipeline with the provided configurations,
sets it up, and executes it. It measures and logs timing information
at each stage.
Parameters
----------
morpheus_pipeline_config : Config
The configuration object for the Morpheus pipeline.
ingest_config : Dict[str, Any]
The ingestion configuration dictionary.
Returns
-------
float
The total time elapsed for pipeline execution in seconds.
"""
logger.info("Starting pipeline setup")
# Initialize the pipeline with the configuration
pipe = Pipeline(morpheus_pipeline_config)
start_abs = datetime.now()
# Set up the ingestion pipeline
setup_ingestion_pipeline(pipe, morpheus_pipeline_config, ingest_config)
# Record setup time
end_setup = start_run = datetime.now()
setup_elapsed = (end_setup - start_abs).total_seconds()
logger.info(f"Pipeline setup completed in {setup_elapsed:.2f} seconds")
# Run the pipeline
logger.info("Running pipeline")
pipe.run()
# Record execution times
end_run = datetime.now()
run_elapsed = (end_run - start_run).total_seconds()
total_elapsed = (end_run - start_abs).total_seconds()
logger.info(f"Pipeline run completed in {run_elapsed:.2f} seconds")
logger.info(f"Total time elapsed: {total_elapsed:.2f} seconds")
return total_elapsed
[docs]
def run_pipeline(morpheus_pipeline_config: Any, ingest_config: Dict[str, Any]) -> float:
"""
Runs the pipeline synchronously in the current process.
This is the primary entry point for executing a pipeline directly
in the current process.
Parameters
----------
morpheus_pipeline_config : Config
The configuration object for the Morpheus pipeline.
ingest_config : Dict[str, Any]
The ingestion configuration dictionary.
Returns
-------
float
The total elapsed time for running the pipeline.
Raises
------
Exception
Any exception raised during pipeline execution will be propagated.
"""
total_elapsed = _launch_pipeline(morpheus_pipeline_config, ingest_config)
logger.debug(f"Pipeline execution completed successfully in {total_elapsed:.2f} seconds.")
return total_elapsed
[docs]
def run_ingest_pipeline(
ingest_config_path: Optional[str] = None,
caption_batch_size: int = 8,
use_cpp: bool = False,
pipeline_batch_size: int = 256,
enable_monitor: bool = False,
feature_length: int = 512,
num_threads: Optional[int] = None,
model_max_batch_size: int = 256,
mode: str = PipelineModes.NLP.value,
log_level: str = "INFO",
) -> None:
"""
Configures and runs the pipeline with the specified options.
This function serves as the main entry point for configuring and
executing a pipeline with user-defined settings.
Parameters
----------
ingest_config_path : str, optional
Path to the JSON configuration file.
caption_batch_size : int, optional
Number of captions to process in a batch (default: 8).
use_cpp : bool, optional
Use C++ backend (default: False).
pipeline_batch_size : int, optional
Batch size for the pipeline (default: 256).
enable_monitor : bool, optional
Enable monitoring (default: False).
feature_length : int, optional
Feature length for embeddings (default: 512).
num_threads : int, optional
Number of threads (default: determined by `get_default_cpu_count`).
model_max_batch_size : int, optional
Model max batch size (default: 256).
mode : str, optional
Pipeline mode (default: PipelineModes.NLP.value).
log_level : str, optional
Log level (default: 'INFO').
Raises
------
ValidationError
If the configuration validation fails.
"""
# Determine number of threads if not specified
if num_threads is None:
num_threads = get_default_cpu_count()
# Validate positive integers
validate_positive(None, None, caption_batch_size)
# Set up logging level based on environment or parameter
log_level_mapping = {
"DEBUG": logging.DEBUG,
"DEFAULT": logging.INFO,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
"CRITICAL": logging.CRITICAL,
}
# Check for INGEST_LOG_LEVEL environment variable
env_log_level = os.getenv("INGEST_LOG_LEVEL")
if env_log_level:
log_level = env_log_level
if log_level in ("DEFAULT",):
log_level = "INFO"
log_level_value = log_level_mapping.get(log_level.upper(), logging.INFO)
logging.basicConfig(level=log_level_value, format="%(asctime)s - %(levelname)s - %(message)s")
configure_logging(log_level=log_level_value)
# Configure C++ backend if requested
CppConfig.set_should_use_cpp(use_cpp)
# Create and configure the Morpheus pipeline
morpheus_pipeline_config = Config()
morpheus_pipeline_config.debug = True if log_level_value == logging.DEBUG else False
morpheus_pipeline_config.log_level = log_level_value
morpheus_pipeline_config.pipeline_batch_size = pipeline_batch_size
morpheus_pipeline_config.enable_monitor = enable_monitor
morpheus_pipeline_config.feature_length = feature_length
morpheus_pipeline_config.num_threads = num_threads
morpheus_pipeline_config.model_max_batch_size = model_max_batch_size
morpheus_pipeline_config.edge_buffer_size = 32
morpheus_pipeline_config.execution_mode = ExecutionMode.CPU
morpheus_pipeline_config.mode = PipelineModes[mode.upper()]
# Start with empty CLI configuration (future enhancement)
cli_ingest_config = {} # TODO: Create a config for overrides -- not necessary yet.
# Load configuration from file if provided
if ingest_config_path:
ingest_config = validate_schema(ingest_config_path)
else:
ingest_config = {}
# Merge options with file configuration
final_ingest_config = merge_dict(ingest_config, cli_ingest_config)
# Validate final configuration using Pydantic
try:
validated_config = PipelineConfigSchema(**final_ingest_config)
logger.info(f"Configuration loaded and validated: {validated_config}")
except ValidationError as e:
logger.error(f"Validation error: {e}")
raise
# Log configurations at debug level
logger.debug(f"Ingest Configuration:\n{json.dumps(final_ingest_config, indent=2)}")
logger.debug(f"Morpheus configuration:\n{morpheus_pipeline_config}")
# Execute the pipeline
run_pipeline(morpheus_pipeline_config, final_ingest_config)
def _set_pdeathsig(sig: int = signal.SIGTERM) -> None:
"""
Sets the parent death signal so that if the parent process dies, the child
receives `sig`. This is Linux-specific.
This mechanism ensures that child processes are terminated when their
parent process is killed, preventing orphaned processes.
Parameters
----------
sig : int
The signal to be sent to the child process upon parent termination (default: SIGTERM).
"""
try:
libc = CDLL("libc.so.6", use_errno=True)
PR_SET_PDEATHSIG = 1
res = libc.prctl(PR_SET_PDEATHSIG, c_int(sig), 0, 0, 0)
if res != 0:
err = os.strerror(os.get_errno())
logger.error(f"Failed to set PDEATHSIG: {err}")
except Exception as e:
logger.error(f"Exception in setting PDEATHSIG: {e}")
def _terminate_subprocess(process: Optional["subprocess.Popen"] = None) -> None:
"""
Terminates the pipeline subprocess and its entire process group.
Sends SIGTERM followed by SIGKILL if necessary.
This function provides a reliable way to clean up all related
processes when terminating the main process.
Parameters
----------
process : subprocess.Popen or None
The subprocess object to terminate. If None, no action is taken.
"""
if process and process.poll() is None:
logger.info("Terminating pipeline subprocess group...")
try:
# Send SIGTERM to the entire process group
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
logger.info("Sent SIGTERM to pipeline subprocess group.")
# Wait for a short duration to allow graceful termination
time.sleep(5)
if process.poll() is None:
# If still alive, send SIGKILL
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
logger.info("Sent SIGKILL to pipeline subprocess group.")
except Exception as e:
logger.error(f"Failed to terminate process group: {e}")
[docs]
def is_port_in_use(port, host="127.0.0.1"):
"""
Checks if a given port is in use on the specified host with socket reuse settings.
Parameters:
port (int): The port number to check.
host (str): The host to check on. Default is '127.0.0.1'.
Returns:
bool: True if the port is in use, False otherwise.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
return False
except socket.error:
return True
[docs]
def start_pipeline_subprocess(
config: PipelineCreationSchema, stdout: Optional[TextIO] = None, stderr: Optional[TextIO] = None
) -> "subprocess.Popen":
"""
Launches the pipeline in a subprocess and ensures that it terminates
if the parent process dies.
This function encapsulates all subprocess-related setup,
including signal handling and `atexit` registration.
Parameters
----------
config : PipelineCreationSchema
Validated pipeline configuration.
stdout : file-like object or None, optional
File-like object for capturing stdout. If None, output is ignored.
stderr : file-like object or None, optional
File-like object for capturing stderr. If None, output is ignored.
Returns
-------
subprocess.Popen
The subprocess object for the launched pipeline.
"""
if is_port_in_use(7671):
err_msg = "Port 7671 is already in use. Please stop the service running on this port and try again."
logger.error(err_msg)
raise Exception(err_msg)
# Define the command to invoke the subprocess_entrypoint API function
subprocess_command = [
sys.executable,
"-c",
"from nv_ingest.framework.orchestration.morpheus.util.pipeline.pipeline_runners import subprocess_entrypoint;"
"subprocess_entrypoint()",
]
# Prepare environment variables from the config
env = os.environ.copy()
env.update({key.upper(): val for key, val in config.model_dump().items()})
logger.info("Starting pipeline subprocess...")
try:
# Get current CPU affinity information to respect container limits
# but we'll only apply it to the child process
try:
# Get the current process's CPU affinity - we only need this to know what's available
current_affinity = os.sched_getaffinity(0) # 0 means current process
# Limit to min(available CPUs from affinity, 8)
max_cpus = min(len(current_affinity), 8)
# Take the first max_cpus from the current affinity set
cpu_set = set(sorted(list(current_affinity))[:max_cpus])
logger.info(f"Current process has access to CPU cores: {current_affinity}")
logger.info(f"Child process will be limited to {max_cpus} cores: {cpu_set}")
except AttributeError:
# sched_getaffinity not available on all platforms
logger.warning("os.sched_getaffinity not available, falling back to cpu_count")
try:
import multiprocessing
total_cpus = multiprocessing.cpu_count()
max_cpus = min(total_cpus, 8)
cpu_set = set(range(max_cpus))
logger.info(f"Child process will be limited to cores 0-{max_cpus-1}")
except Exception as e:
logger.warning(f"Failed to determine CPU count: {e}. Will not set CPU affinity.")
cpu_set = None
except Exception as e:
logger.warning(f"Failed to get current CPU affinity: {e}. Will not set CPU affinity.")
cpu_set = None
def combined_preexec_fn():
"""Setup function to run in the child process before exec()."""
# Start a new session to create a new process group
os.setsid()
# Set the parent death signal to SIGTERM
_set_pdeathsig(signal.SIGTERM)
# Set CPU affinity ONLY for the child process
if cpu_set is not None:
try:
# Apply to current process (which becomes the child)
# This doesn't affect the parent because we're in the fork+exec
# stage and changes here only affect the child
os.sched_setaffinity(0, cpu_set)
logger.debug(f"Set CPU affinity for subprocess to {cpu_set}")
except AttributeError:
logger.warning("os.sched_setaffinity not available, using taskset as fallback")
# Note: We can't use taskset here as it would need to be applied before Popen
except Exception as e:
logger.warning(f"Failed to set CPU affinity: {e}")
# Configure output redirection
stdout_stream = subprocess.DEVNULL if stdout is None else subprocess.PIPE
stderr_stream = subprocess.DEVNULL if stderr is None else subprocess.PIPE
# Apply taskset as fallback if sched_setaffinity is not available but taskset exists
has_taskset = False
if cpu_set is not None and not hasattr(os, "sched_setaffinity"):
try:
# Check if taskset is available on the system
taskset_check = subprocess.run(
["which", "taskset"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
has_taskset = taskset_check.returncode == 0
if has_taskset:
# Create CPU mask from the specific cores in cpu_set
cpu_mask = ",".join(str(cpu) for cpu in sorted(cpu_set))
subprocess_command = ["taskset", "-c", cpu_mask] + subprocess_command
logger.info(f"Using taskset to limit to CPU cores {cpu_mask}")
else:
logger.warning("Neither sched_setaffinity nor taskset are available. CPU affinity will not be set.")
except Exception as e:
logger.warning(f"Failed to check for taskset: {e}. CPU affinity will not be set.")
has_taskset = False
# Start the subprocess
process = subprocess.Popen(
subprocess_command,
stdout=stdout_stream,
stderr=stderr_stream,
text=True,
preexec_fn=combined_preexec_fn,
env=env,
)
logger.debug(f"Pipeline subprocess started with PID: {process.pid}")
# Register the atexit handler to terminate the subprocess group on exit
atexit.register(_terminate_subprocess, process)
# Define and register signal handlers for graceful shutdown
def signal_handler(signum, frame):
"""Handle signals to ensure clean subprocess termination."""
logger.info(f"Received signal {signum}. Terminating pipeline subprocess group...")
_terminate_subprocess(process)
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start threads to read stdout and stderr only if user provided handlers
if stdout is not None:
stdout_thread = threading.Thread(
target=_read_stream,
args=(process.stdout, "Pipeline STDOUT", stdout),
name="StdoutReader",
daemon=True,
)
stdout_thread.start()
if stderr is not None:
stderr_thread = threading.Thread(
target=_read_stream,
args=(process.stderr, "Pipeline STDERR", stderr),
name="StderrReader",
daemon=True,
)
stderr_thread.start()
logger.info("Pipeline subprocess started successfully.")
return process
except Exception as e:
logger.error(f"Failed to start pipeline subprocess: {e}")
raise
def _read_stream(stream: TextIO, prefix: str, output_stream: TextIO) -> None:
"""
Reads lines from a subprocess stream (stdout or stderr) and writes them
to the provided output stream with a prefix.
This function runs in a separate daemon thread to handle output
in a non-blocking way.
Parameters
----------
stream : TextIO
The stream object to read from (subprocess stdout or stderr).
prefix : str
The prefix to prepend to each line of output.
output_stream : TextIO
The file-like object where the output should be written.
"""
try:
for line in iter(stream.readline, ""):
if line:
output_stream.write(f"[{prefix}] {line}")
output_stream.flush()
except Exception as e:
logger.error(f"Error reading {prefix}: {e}")
finally:
stream.close()
[docs]
def subprocess_entrypoint() -> None:
"""
Entry point for the pipeline subprocess.
This function is called when a pipeline subprocess is started.
It configures logging and runs the ingest pipeline.
Raises
------
Exception
Any exception raised during pipeline execution will cause
the subprocess to exit with a non-zero status code.
"""
logger.info("Starting pipeline subprocess...")
try:
# Run the pipeline - this function blocks until the pipeline is done
run_ingest_pipeline()
logger.info("Pipeline completed successfully.")
except Exception as e:
logger.error(f"Pipeline failed: {e}")
sys.exit(1) # Exit with a non-zero status code to indicate failure