nv_ingest.framework.orchestration.process package#
Submodules#
nv_ingest.framework.orchestration.process.dependent_services module#
Dependent services management for pipeline orchestration.
This module contains utilities for starting and managing dependent services that the pipeline requires, such as message brokers and other infrastructure.
- nv_ingest.framework.orchestration.process.dependent_services.start_simple_message_broker(
- broker_client: dict,
Starts a SimpleMessageBroker server in a separate process.
- Parameters:
broker_client (dict) –
- Broker configuration. Expected keys include:
”port”: the port to bind the server to,
”broker_params”: optionally including “max_queue_size”,
and any other parameters required by SimpleMessageBroker.
- Returns:
The process running the SimpleMessageBroker server.
- Return type:
multiprocessing.Process
nv_ingest.framework.orchestration.process.execution module#
Low-level pipeline execution functions.
This module contains the core pipeline execution functions that are shared between different execution strategies, extracted to avoid circular imports.
- nv_ingest.framework.orchestration.process.execution.build_logging_config_from_env() LoggingConfig[source]#
Build Ray LoggingConfig from environment variables. Package-level preset (sets all defaults): - INGEST_RAY_LOG_LEVEL: PRODUCTION, DEVELOPMENT, DEBUG. Default: DEVELOPMENT Individual environment variables (override preset defaults): - RAY_LOGGING_LEVEL: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default: INFO - RAY_LOGGING_ENCODING: Log encoding format (TEXT, JSON). Default: TEXT - RAY_LOGGING_ADDITIONAL_ATTRS: Comma-separated list of additional standard logger attributes - RAY_DEDUP_LOGS: Enable/disable log deduplication (0/1). Default: 1 (enabled) - RAY_LOG_TO_DRIVER: Enable/disable logging to driver (true/false). Default: true - RAY_LOGGING_ROTATE_BYTES: Maximum log file size before rotation (bytes). Default: 1GB - RAY_LOGGING_ROTATE_BACKUP_COUNT: Number of backup log files to keep. Default: 19 - RAY_DISABLE_IMPORT_WARNING: Disable Ray import warnings (0/1). Default: 0 - RAY_USAGE_STATS_ENABLED: Enable/disable usage stats collection (0/1). Default: 1
- nv_ingest.framework.orchestration.process.execution.kill_pipeline_process_group(process: Process) None[source]#
Backward-compatible shim that delegates to process.termination implementation.
- nv_ingest.framework.orchestration.process.execution.launch_pipeline(
- pipeline_config: PipelineConfigSchema,
- block: bool = True,
- disable_dynamic_scaling: bool | None = None,
- dynamic_memory_threshold: float | None = None,
Launch a pipeline using the provided configuration.
This function handles the core pipeline launching logic including Ray initialization, pipeline building, and execution loop.
- Parameters:
pipeline_config (PipelineConfigSchema) – Validated pipeline configuration to execute.
block (bool, optional) – Whether to block until pipeline completes, by default True.
disable_dynamic_scaling (Optional[bool], optional) – Override for dynamic scaling behavior, by default None.
dynamic_memory_threshold (Optional[float], optional) – Override for memory threshold, by default None.
- Returns:
Raw pipeline object (type elided to avoid circular import) and elapsed time. For blocking execution, returns (None, elapsed_time). For non-blocking, returns (pipeline, None).
- Return type:
Tuple[Union[Any, None], Optional[float]]
- nv_ingest.framework.orchestration.process.execution.redirect_os_fds(
- stdout: TextIO | None = None,
- stderr: TextIO | None = None,
Redirect OS-level stdout (fd=1) and stderr (fd=2) to the given file-like objects, or to /dev/null if not provided.
- Parameters:
stdout (Optional[TextIO]) – Stream to receive OS-level stdout. If None, redirected to /dev/null.
stderr (Optional[TextIO]) – Stream to receive OS-level stderr. If None, redirected to /dev/null.
- nv_ingest.framework.orchestration.process.execution.run_pipeline_process(
- pipeline_config: PipelineConfigSchema,
- stdout: TextIO | None = None,
- stderr: TextIO | None = None,
Entry point for running a pipeline in a subprocess.
This function is designed to be the target of a multiprocessing.Process, handling output redirection and process group management.
- Parameters:
pipeline_config (PipelineConfigSchema) – Pipeline configuration object.
stdout (Optional[TextIO], optional) – Output stream for subprocess stdout, by default None.
stderr (Optional[TextIO], optional) – Error stream for subprocess stderr, by default None.
nv_ingest.framework.orchestration.process.lifecycle module#
Pipeline lifecycle management for declarative execution.
This module provides high-level lifecycle management for pipelines, orchestrating configuration resolution, broker setup, and execution using the configured strategy pattern.
- class nv_ingest.framework.orchestration.process.lifecycle.PipelineLifecycleManager(
- strategy: ProcessExecutionStrategy,
Bases:
objectHigh-level manager for pipeline lifecycle operations.
This class orchestrates the complete pipeline lifecycle including broker setup, configuration validation, and execution using the configured execution strategy.
- strategy#
The execution strategy to use for running pipelines.
- Type:
- start(
- config: PipelineConfigSchema,
- options: ExecutionOptions,
Start a pipeline using the configured execution strategy.
This method handles the complete pipeline startup process: 1. Validate configuration 2. Start message broker if required 3. Execute pipeline using the configured strategy
- Parameters:
config (PipelineConfigSchema) – Validated pipeline configuration to execute.
options (ExecutionOptions) – Execution options controlling blocking behavior and output.
- Returns:
Result containing pipeline interface and/or timing information.
- Return type:
- Raises:
RuntimeError – If pipeline startup fails.
- stop(pipeline_id: str | None = None) None[source]#
Stop a running pipeline.
This method provides a hook for future pipeline stopping functionality. Currently, pipeline stopping is handled by the individual interfaces. Additionally, it ensures any dependent services (like the simple message broker) are terminated to avoid lingering processes.
- Parameters:
pipeline_id (Optional[str]) – Identifier of the pipeline to stop. Currently unused.
nv_ingest.framework.orchestration.process.strategies module#
Process execution strategies for pipeline deployment.
This module defines abstract and concrete strategies for executing pipelines in different process contexts (in-process vs subprocess), implementing the Strategy pattern for clean separation of execution concerns.
- class nv_ingest.framework.orchestration.process.strategies.InProcessStrategy[source]#
Bases:
ProcessExecutionStrategyStrategy for executing pipelines in the current process.
This strategy runs the pipeline directly in the current Python process, providing the most direct execution path with minimal overhead.
- execute(
- config: PipelineConfigSchema,
- options: ExecutionOptions,
Execute pipeline in the current process.
- Parameters:
config (PipelineConfigSchema) – Pipeline configuration to execute.
options (ExecutionOptions) – Execution options. stdout/stderr are ignored for in-process execution.
- Returns:
Result with pipeline interface (non-blocking) or elapsed time (blocking).
- Return type:
- class nv_ingest.framework.orchestration.process.strategies.ProcessExecutionStrategy[source]#
Bases:
ABCAbstract base class for pipeline execution strategies.
This class defines the interface for different ways of executing a pipeline (in-process, subprocess, etc.) using the Strategy pattern.
- abstractmethod execute(
- config: PipelineConfigSchema,
- options: ExecutionOptions,
Execute a pipeline using this strategy.
- Parameters:
config (PipelineConfigSchema) – Validated pipeline configuration to execute.
options (ExecutionOptions) – Execution options controlling blocking behavior and output redirection.
- Returns:
Result containing pipeline interface and/or timing information.
- Return type:
- class nv_ingest.framework.orchestration.process.strategies.SubprocessStrategy[source]#
Bases:
ProcessExecutionStrategyStrategy for executing pipelines in a separate subprocess.
This strategy launches the pipeline in a separate Python process using multiprocessing, providing process isolation and output redirection.
- execute(
- config: PipelineConfigSchema,
- options: ExecutionOptions,
Execute pipeline in a separate subprocess.
- Parameters:
config (PipelineConfigSchema) – Pipeline configuration to execute.
options (ExecutionOptions) – Execution options including output redirection streams.
- Returns:
Result with subprocess interface (non-blocking) or elapsed time (blocking).
- Return type:
- nv_ingest.framework.orchestration.process.strategies.create_execution_strategy(
- run_in_subprocess: bool,
Factory function to create the appropriate execution strategy.
- Parameters:
run_in_subprocess (bool) – If True, creates SubprocessStrategy. If False, creates InProcessStrategy.
- Returns:
Configured execution strategy instance.
- Return type:
nv_ingest.framework.orchestration.process.termination module#
Process termination utilities, isolated to avoid circular imports.
This module provides functions to terminate a process and its entire process group safely, without depending on pipeline construction or Ray types.
- nv_ingest.framework.orchestration.process.termination.kill_pipeline_process_group(process) None[source]#
Kill a process and its entire process group.
Accepts either a multiprocessing.Process-like object exposing a
pidattribute or a raw PID integer. Sends SIGTERM to the process group first, and escalates to SIGKILL if it does not terminate within a short grace period.- Parameters:
process (multiprocessing.Process | int) – Process handle (or a raw PID int) for the process whose process group should be terminated.