Source code for nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import time
from abc import ABC
from typing import Optional, Any

import ray
import logging

from nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base import RayActorStage

logger = logging.getLogger(__name__)


[docs] class RayActorSinkStage(RayActorStage, ABC): """ Abstract base class for sink stages in a RayPipeline. Sink stages do not support an output queue; instead, they implement write_output to deliver their final processed messages. """ def __init__(self, config: Any, log_to_stdout=False, stage_name: Optional[str] = None) -> None: super().__init__(config, log_to_stdout=log_to_stdout, stage_name=stage_name)
[docs] @ray.method(num_returns=1) def set_output_queue(self, queue_handle: any) -> bool: raise NotImplementedError("Sink stages do not support an output queue.")
def _processing_loop(self) -> None: """ The main processing loop executed in a background thread. Continuously reads from the input queue, processes items using `on_data`, performs final processing, and deletes the control message. Exits when `self._running` becomes False. Upon loop termination, it schedules `_request_actor_exit` to run on the main Ray actor thread to ensure a clean shutdown via `ray.actor.exit_actor()`. """ actor_id_str = self._get_actor_id_str() logger.debug(f"{actor_id_str}: Processing loop thread starting.") try: # Loop continues as long as the actor is marked as running while self._running: control_message: Optional[Any] = None try: # Step 1: Attempt to get work from the input queue control_message = self._read_input() # If no message, loop back and check self._running again if control_message is None: continue # Go to the next iteration of the while loop self.stats["successful_queue_reads"] += 1 # Step 2: Process the retrieved message self._active_processing = True # Mark as busy self.on_data(control_message) self.stats["processed"] += 1 except Exception as e: # Log exceptions during item processing but continue the loop cm_info = f" (message type: {type(control_message).__name__})" if control_message else "" logger.exception(f"{actor_id_str}: Error processing item{cm_info}: {e}") # Avoid busy-spinning in case of persistent errors reading or processing if self._running: time.sleep(0.1) finally: # Ensure active_processing is reset regardless of success/failure/output self._active_processing = False # --- Loop Exit --- logger.debug( f"{actor_id_str}: Graceful exit condition met (self._running is False). Processing loop terminating." ) except Exception as e: # Catch unexpected errors in the loop structure itself self._logger.exception(f"{actor_id_str}: Unexpected error caused processing loop termination: {e}") finally: self._logger.debug(f"{actor_id_str}: Processing loop thread finished.") self._shutdown_signal_complete = True