Source code for nv_ingest.framework.orchestration.process.strategies
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Process execution strategies for pipeline deployment.
This module defines abstract and concrete strategies for executing pipelines
in different process contexts (in-process vs subprocess), implementing the
Strategy pattern for clean separation of execution concerns.
"""
import atexit
import logging
import multiprocessing
import os
import sys
import time
from abc import ABC, abstractmethod
from nv_ingest.pipeline.pipeline_schema import PipelineConfigSchema
from nv_ingest.framework.orchestration.execution.options import ExecutionOptions, ExecutionResult
from nv_ingest.framework.orchestration.ray.primitives.ray_pipeline import (
RayPipelineInterface,
RayPipelineSubprocessInterface,
)
from nv_ingest.framework.orchestration.process.execution import (
launch_pipeline,
run_pipeline_process,
)
from nv_ingest.framework.orchestration.process.termination import (
kill_pipeline_process_group,
)
logger = logging.getLogger(__name__)
[docs]
class ProcessExecutionStrategy(ABC):
"""
Abstract base class for pipeline execution strategies.
This class defines the interface for different ways of executing
a pipeline (in-process, subprocess, etc.) using the Strategy pattern.
"""
[docs]
@abstractmethod
def execute(self, config: PipelineConfigSchema, options: ExecutionOptions) -> ExecutionResult:
"""
Execute a pipeline using this strategy.
Parameters
----------
config : PipelineConfigSchema
Validated pipeline configuration to execute.
options : ExecutionOptions
Execution options controlling blocking behavior and output redirection.
Returns
-------
ExecutionResult
Result containing pipeline interface and/or timing information.
"""
pass
[docs]
class InProcessStrategy(ProcessExecutionStrategy):
"""
Strategy for executing pipelines in the current process.
This strategy runs the pipeline directly in the current Python process,
providing the most direct execution path with minimal overhead.
"""
[docs]
def execute(self, config: PipelineConfigSchema, options: ExecutionOptions) -> ExecutionResult:
"""
Execute pipeline in the current process.
Parameters
----------
config : PipelineConfigSchema
Pipeline configuration to execute.
options : ExecutionOptions
Execution options. stdout/stderr are ignored for in-process execution.
Returns
-------
ExecutionResult
Result with pipeline interface (non-blocking) or elapsed time (blocking).
"""
logger.info("Executing pipeline in current process")
# Execute the pipeline using existing launch_pipeline function
# launch_pipeline returns raw RayPipeline object (not wrapped in interface)
pipeline, total_elapsed = launch_pipeline(
config,
block=options.block,
disable_dynamic_scaling=None, # Already applied in config
)
if options.block:
logger.debug(f"Pipeline execution completed successfully in {total_elapsed:.2f} seconds.")
return ExecutionResult(interface=None, elapsed_time=total_elapsed)
else:
# Wrap the raw RayPipeline in RayPipelineInterface
interface = RayPipelineInterface(pipeline)
return ExecutionResult(interface=interface, elapsed_time=None)
[docs]
class SubprocessStrategy(ProcessExecutionStrategy):
"""
Strategy for executing pipelines in a separate subprocess.
This strategy launches the pipeline in a separate Python process using
multiprocessing, providing process isolation and output redirection.
"""
[docs]
def execute(self, config: PipelineConfigSchema, options: ExecutionOptions) -> ExecutionResult:
"""
Execute pipeline in a separate subprocess.
Parameters
----------
config : PipelineConfigSchema
Pipeline configuration to execute.
options : ExecutionOptions
Execution options including output redirection streams.
Returns
-------
ExecutionResult
Result with subprocess interface (non-blocking) or elapsed time (blocking).
"""
logger.info("Launching pipeline in Python subprocess using multiprocessing.")
# Create subprocess using fork context
start_method = "fork"
if sys.platform.lower() == "darwin":
start_method = "spawn"
ctx = multiprocessing.get_context(start_method)
process = ctx.Process(
target=run_pipeline_process,
args=(
config,
options.stdout, # raw_stdout
options.stderr, # raw_stderr
),
daemon=False,
)
# Hint to the lifecycle manager to skip starting the broker in the parent
prev_val = os.environ.get("NV_INGEST_BROKER_IN_SUBPROCESS")
os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"] = "1"
try:
process.start()
finally:
# Restore original env to avoid affecting other code paths
if prev_val is None:
try:
del os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"]
except KeyError:
pass
else:
os.environ["NV_INGEST_BROKER_IN_SUBPROCESS"] = prev_val
interface = RayPipelineSubprocessInterface(process)
if options.block:
# Block until subprocess completes, handling Ctrl+C to ensure teardown
start_time = time.time()
logger.info("Waiting for subprocess pipeline to complete...")
try:
process.join()
except KeyboardInterrupt:
logger.info("KeyboardInterrupt in parent; terminating subprocess group...")
try:
pid = int(process.pid)
kill_pipeline_process_group(pid)
finally:
# Best-effort wait for process to exit
try:
process.join(timeout=5.0)
except Exception:
pass
finally:
logger.info("Pipeline subprocess completed or terminated.")
elapsed_time = time.time() - start_time
# If process ended with failure, surface it
if hasattr(process, "exitcode") and process.exitcode not in (0, None):
raise RuntimeError(f"Pipeline subprocess exited with code {process.exitcode}")
return ExecutionResult(interface=None, elapsed_time=elapsed_time)
else:
# Return interface for non-blocking execution
logger.info(f"Pipeline subprocess started (PID={process.pid})")
# Ensure we pass the Process object, not just the PID, to avoid AttributeError
# kill_pipeline_process_group expects a multiprocessing.Process instance
# Capture raw PID to avoid using multiprocessing APIs during interpreter shutdown
pid = int(process.pid)
atexit.register(kill_pipeline_process_group, pid)
return ExecutionResult(interface=interface, elapsed_time=None)
[docs]
def create_execution_strategy(run_in_subprocess: bool) -> ProcessExecutionStrategy:
"""
Factory function to create the appropriate execution strategy.
Parameters
----------
run_in_subprocess : bool
If True, creates SubprocessStrategy. If False, creates InProcessStrategy.
Returns
-------
ProcessExecutionStrategy
Configured execution strategy instance.
"""
if run_in_subprocess:
return SubprocessStrategy()
else:
return InProcessStrategy()