Source code for nv_ingest.framework.orchestration.process.lifecycle
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Pipeline lifecycle management for declarative execution.
This module provides high-level lifecycle management for pipelines,
orchestrating configuration resolution, broker setup, and execution
using the configured strategy pattern.
"""
import logging
import atexit
import multiprocessing
import os
import signal
from typing import Optional
from nv_ingest.pipeline.pipeline_schema import PipelineConfigSchema
from nv_ingest.framework.orchestration.execution.options import ExecutionOptions, ExecutionResult
from nv_ingest.framework.orchestration.process.strategies import ProcessExecutionStrategy
from nv_ingest.framework.orchestration.process.strategies import SubprocessStrategy
from nv_ingest.framework.orchestration.process.dependent_services import start_simple_message_broker
logger = logging.getLogger(__name__)
[docs]
class PipelineLifecycleManager:
"""
High-level manager for pipeline lifecycle operations.
This class orchestrates the complete pipeline lifecycle including
broker setup, configuration validation, and execution using the
configured execution strategy.
Attributes
----------
strategy : ProcessExecutionStrategy
The execution strategy to use for running pipelines.
"""
def __init__(self, strategy: ProcessExecutionStrategy):
"""
Initialize the lifecycle manager with an execution strategy.
Parameters
----------
strategy : ProcessExecutionStrategy
The strategy to use for pipeline execution.
"""
self.strategy = strategy
# Track broker process so we can terminate it during teardown
self._broker_process: Optional[multiprocessing.Process] = None
[docs]
def start(self, config: PipelineConfigSchema, options: ExecutionOptions) -> ExecutionResult:
"""
Start a pipeline using the configured execution strategy.
This method handles the complete pipeline startup process:
1. Validate configuration
2. Start message broker if required
3. Execute pipeline using the configured strategy
Parameters
----------
config : PipelineConfigSchema
Validated pipeline configuration to execute.
options : ExecutionOptions
Execution options controlling blocking behavior and output.
Returns
-------
ExecutionResult
Result containing pipeline interface and/or timing information.
Raises
------
RuntimeError
If pipeline startup fails.
"""
logger.info("Starting pipeline lifecycle")
# If running pipeline in a subprocess and broker is enabled, ensure the broker
# is launched in the child process group by signaling via environment variable
prev_env = None
set_env = False
if getattr(config, "pipeline", None) and getattr(config.pipeline, "launch_simple_broker", False):
if isinstance(self.strategy, SubprocessStrategy):
prev_env = os.environ.get("NV_INGEST_BROKER_IN_SUBPROCESS")
os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"] = "1"
set_env = True
try:
# Start message broker if configured (may defer to subprocess based on env)
self._setup_message_broker(config)
# Execute pipeline using the configured strategy
result = self.strategy.execute(config, options)
logger.info("Pipeline lifecycle started successfully")
return result
except Exception as e:
logger.error(f"Failed to start pipeline lifecycle: {e}")
raise RuntimeError(f"Pipeline startup failed: {e}") from e
finally:
if set_env:
if prev_env is None:
try:
del os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"]
except KeyError:
pass
else:
os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"] = prev_env
def _setup_message_broker(self, config: PipelineConfigSchema) -> None:
"""
Set up message broker if required by configuration.
Parameters
----------
config : PipelineConfigSchema
Pipeline configuration containing broker settings.
"""
if config.pipeline.launch_simple_broker:
# If requested to launch broker inside the subprocess, skip here
if os.environ.get("NV_INGEST_BROKER_IN_SUBPROCESS") == "1":
logger.info("Deferring SimpleMessageBroker launch to subprocess")
return
logger.info("Starting simple message broker")
# Start the broker and retain a handle for cleanup.
# Use defaults (host=0.0.0.0, port=7671) as set by the broker implementation.
try:
self._broker_process = start_simple_message_broker({})
# Ensure cleanup at interpreter shutdown in case caller forgets
atexit.register(self._terminate_broker_atexit)
logger.info(f"SimpleMessageBroker started (pid={getattr(self._broker_process, 'pid', None)})")
except Exception as e:
logger.error(f"Failed to start SimpleMessageBroker: {e}")
raise
else:
logger.debug("Simple broker launch not required")
[docs]
def stop(self, pipeline_id: Optional[str] = None) -> None:
"""
Stop a running pipeline.
This method provides a hook for future pipeline stopping functionality.
Currently, pipeline stopping is handled by the individual interfaces.
Additionally, it ensures any dependent services (like the simple
message broker) are terminated to avoid lingering processes.
Parameters
----------
pipeline_id : Optional[str]
Identifier of the pipeline to stop. Currently unused.
"""
logger.info("Pipeline stop requested")
# Best-effort termination of broker if we started one
self._terminate_broker()
# --- Internal helpers ---
def _terminate_broker_atexit(self) -> None:
"""Atexit-safe broker termination.
Avoids raising exceptions during interpreter shutdown.
"""
try:
self._terminate_broker()
except Exception:
# Swallow errors at atexit to avoid noisy shutdowns
pass
def _terminate_broker(self) -> None:
"""Terminate the SimpleMessageBroker process if running."""
proc = self._broker_process
if not proc:
return
try:
if hasattr(proc, "is_alive") and not proc.is_alive():
return
except Exception:
# If querying state fails, continue with termination attempt
pass
pid = getattr(proc, "pid", None)
logger.info(f"Stopping SimpleMessageBroker (pid={pid})")
try:
# First, try graceful terminate
proc.terminate()
try:
proc.join(timeout=3.0)
except Exception:
pass
# If still alive, escalate to SIGKILL on the single process
still_alive = False
try:
still_alive = hasattr(proc, "is_alive") and proc.is_alive()
except Exception:
still_alive = True
if still_alive and pid is not None:
try:
os.kill(pid, signal.SIGKILL)
except Exception:
pass
try:
proc.join(timeout=2.0)
except Exception:
pass
finally:
# Clear handle to avoid repeated attempts
self._broker_process = None