Source code for nv_ingest.framework.orchestration.process.termination
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Process termination utilities, isolated to avoid circular imports.
This module provides functions to terminate a process and its entire process
group safely, without depending on pipeline construction or Ray types.
"""
import logging
import os
import signal
import time
from typing import Optional
logger = logging.getLogger(__name__)
def _safe_log(level: int, msg: str) -> None:
"""Best-effort logging that won't emit handler tracebacks on closed streams.
Temporarily disables logging.raiseExceptions to prevent the logging module
from printing "--- Logging error ---" to stderr if a handler's stream is
already closed (common during process teardown). Falls back to writing to
sys.__stderr__ if available.
"""
try:
import logging as _logging
prev = getattr(_logging, "raiseExceptions", True)
# Suppress handler errors being printed to stderr
_logging.raiseExceptions = False
# If there are no handlers, skip and use stderr fallback
if logger.handlers:
logger.log(level, msg)
return
except Exception:
# Intentionally ignore and try stderr fallback
pass
finally:
try:
import logging as _logging # re-import safe even if earlier failed
_logging.raiseExceptions = prev # type: ignore[name-defined]
except Exception:
pass
# Fallback to stderr if available
try:
import sys
if hasattr(sys, "__stderr__") and sys.__stderr__:
sys.__stderr__.write(msg + "\n")
sys.__stderr__.flush()
except Exception:
pass
[docs]
def kill_pipeline_process_group(process) -> None:
"""
Kill a process and its entire process group.
Accepts either a multiprocessing.Process-like object exposing a ``pid`` attribute
or a raw PID integer. Sends SIGTERM to the process group first, and escalates
to SIGKILL if it does not terminate within a short grace period.
Parameters
----------
process : multiprocessing.Process | int
Process handle (or a raw PID int) for the process whose process group should be terminated.
"""
proc: Optional[object] = None
pid: Optional[int] = None
if isinstance(process, int):
pid = process
elif hasattr(process, "pid"):
proc = process
try:
pid = int(getattr(proc, "pid"))
except Exception as e:
raise AttributeError(f"Invalid process-like object without usable pid: {e}")
else:
raise AttributeError(
"kill_pipeline_process_group expects a multiprocessing.Process or a PID int (process-like object with .pid)"
)
if proc is not None and hasattr(proc, "is_alive") and not proc.is_alive():
_safe_log(logging.DEBUG, "Process already terminated")
return
if pid is None:
raise AttributeError("Unable to determine PID for process group termination")
_safe_log(logging.INFO, f"Terminating pipeline process group (PID: {pid})")
try:
# Send graceful termination to the entire process group
try:
pgid = os.getpgid(pid)
except Exception:
# Process already gone
_safe_log(logging.DEBUG, f"Process group for PID {pid} not found during SIGTERM phase")
return
try:
os.killpg(pgid, signal.SIGTERM)
except ProcessLookupError:
_safe_log(logging.DEBUG, f"Process group for PID {pid} no longer exists (SIGTERM)")
return
# If we have a Process handle, give it a chance to exit cleanly
if proc is not None and hasattr(proc, "join"):
try:
proc.join(timeout=5.0)
except Exception:
pass
still_alive = getattr(proc, "is_alive", lambda: True)()
else:
# Without a handle, provide a small grace period
time.sleep(2.0)
try:
_ = os.getpgid(pid)
still_alive = True
except Exception:
still_alive = False
if still_alive:
_safe_log(logging.WARNING, "Process group did not terminate gracefully, using SIGKILL")
try:
try:
pgid2 = os.getpgid(pid)
except Exception:
_safe_log(logging.DEBUG, f"Process group for PID {pid} vanished before SIGKILL")
return
os.killpg(pgid2, signal.SIGKILL)
finally:
if proc is not None and hasattr(proc, "join"):
try:
proc.join(timeout=3.0)
except Exception:
pass
except (ProcessLookupError, OSError) as e:
_safe_log(logging.DEBUG, f"Process group already terminated or not found: {e}")