nv_ingest.framework.orchestration.ray.stages.meta package#
Submodules#
nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_edge_base module#
- class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_edge_base.RayActorEdge(
- max_size: int,
- multi_reader: bool = False,
- multi_writer: bool = False,
Bases:
ABCAbstract base class for a Ray actor edge used in a RayPipeline.
- Parameters:
max_size (int) – The maximum size of the edge’s internal queue.
multi_reader (bool) – Whether the edge supports multiple concurrent readers.
multi_writer (bool) – Whether the edge supports multiple concurrent writers.
- abstractmethod get_stats() Dict[str, int][source]#
Get current statistics for the edge.
- Returns:
A dictionary containing statistics (e.g. write_count, read_count, queue_full_count, current_size).
- Return type:
Dict[str, int]
nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base module#
- class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base.RayActorSinkStage(
- config: Any,
- log_to_stdout=False,
- stage_name: str | None = None,
Bases:
RayActorStage,ABCAbstract base class for sink stages in a RayPipeline. Sink stages do not support an output queue; instead, they implement write_output to deliver their final processed messages.
- set_output_queue(
- queue_handle: any,
- *,
- _ray_trace_ctx=None,
Sets the output queue handle for this actor stage.
Should be called before start().
- Parameters:
queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) to which this actor should write output items.
- Returns:
True indicating the queue was set.
- Return type:
bool
nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_source_stage_base module#
- class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_source_stage_base.RayActorSourceStage(
- config: Any,
- log_to_stdout=False,
- stage_name: str | None = None,
Bases:
RayActorStage,ABCAbstract base class for source stages in a RayPipeline. Source stages do not support an input queue. Instead, they must implement get_input() to fetch control messages from an external source.
- on_data(
- IngestControlMessage,
- *,
- _ray_trace_ctx=None,
Process a single data item (control message).
This is the core logic method that must be implemented by subclasses. It receives an item dequeued by read_input and performs the stage-specific processing.
- Parameters:
control_message (Any) – The data item retrieved from the input queue.
- Returns:
The result of the processing. If a result is returned (not None), it will be placed onto the output_queue. Return None if this stage does not produce output or if this specific message yields no result.
- Return type:
Optional[Any]
- set_input_queue(
- queue_handle: Any,
- *,
- _ray_trace_ctx=None,
Sets the input queue handle for this actor stage.
Should be called before start().
- Parameters:
queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) from which this actor should read input items.
- Returns:
True indicating the queue was set.
- Return type:
bool
nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base module#
- class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base.RayActorStage(
- config: BaseModel,
- stage_name: str | None = None,
- log_to_stdout=False,
Bases:
ABCAbstract base class for a stateful Ray actor stage in a processing pipeline.
This class provides a common structure for actors that consume items from an input queue, process them, and potentially place results onto an output queue. It utilizes a background thread for the main processing loop to avoid blocking the main Ray actor thread. It includes basic statistics tracking (processed count, elapsed time, processing rate) and mechanisms for graceful shutdown.
Subclasses must implement the on_data method to define the specific processing logic for each item.
- config#
Configuration object for the stage.
- Type:
BaseModel
- stage_name#
Name of the stage from YAML pipeline configuration. Used by stage-aware decorators for consistent naming.
- Type:
Optional[str]
- _input_queue#
Handle to the Ray queue from which input items are read. Expected to be set via set_input_queue.
- Type:
Optional[Any]
- _output_queue#
Handle to the Ray queue where processed items are placed. Expected to be set via set_output_queue.
- Type:
Optional[Any]
- _running#
Flag indicating if the processing loop should be actively running. Set to True by start() and False by stop(). Controls the main loop.
- Type:
bool
- _active_processing#
Flag indicating if the on_data method is currently executing. Useful for understanding if the actor is busy at a given moment.
- Type:
bool
- stats#
Dictionary to store basic operational statistics. Currently tracks ‘processed’.
- Type:
Dict[str, int]
- start_time#
Timestamp (from time.time()) when the start() method was called. Used for calculating total elapsed time.
- Type:
Optional[float]
- _last_processed_count#
Internal state variable storing the processed count at the last get_stats call. Used for calculating interval processing rate.
- Type:
int
- _last_stats_time#
Internal state variable storing the timestamp of the last get_stats call. Used for calculating interval processing rate.
- Type:
Optional[float]
- _processing_thread#
Handle to the background thread running the _processing_loop.
- Type:
Optional[threading.Thread]
- _shutting_down#
Internal flag to prevent redundant shutdown actions, protected by _lock.
- Type:
bool
- _lock#
Lock to protect access to shutdown-related state (_shutting_down).
- Type:
threading.Lock
- get_stats(
- *,
- _ray_trace_ctx=None,
Retrieves performance statistics for the actor.
Calculates the approximate processing rate since the last call to get_stats or since start().
- Returns:
- A dictionary containing statistics:
’processed’ (int): Total items processed since the actor started.
’elapsed’ (float): Total time in seconds since the actor started.
- ’active_processing’ (bool): Whether the actor was actively
processing an item in on_data at the moment this method was called.
- ’processing_rate_cps’ (float): Calculated items processed per
second during the last interval. Can be zero if no items were processed or the interval was too short.
’memory_mb’ (float): The total memory usage of the current actor process (RSS) in megabytes (MB).
- Return type:
Dict[str, Any]
- is_shutdown_complete(*, _ray_trace_ctx=None) bool[source]#
Checks if the actor’s processing loop has finished and signaled completion. Raises RayActorError if the actor process has terminated.
- abstractmethod on_data(
- control_message: Any,
Process a single data item (control message).
This is the core logic method that must be implemented by subclasses. It receives an item dequeued by read_input and performs the stage-specific processing.
- Parameters:
control_message (Any) – The data item retrieved from the input queue.
- Returns:
The result of the processing. If a result is returned (not None), it will be placed onto the output_queue. Return None if this stage does not produce output or if this specific message yields no result.
- Return type:
Optional[Any]
- set_input_queue(
- queue_handle: Any,
- *,
- _ray_trace_ctx=None,
Sets the input queue handle for this actor stage.
Should be called before start().
- Parameters:
queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) from which this actor should read input items.
- Returns:
True indicating the queue was set.
- Return type:
bool
- set_output_queue(
- queue_handle: Any,
- *,
- _ray_trace_ctx=None,
Sets the output queue handle for this actor stage.
Should be called before start().
- Parameters:
queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) to which this actor should write output items.
- Returns:
True indicating the queue was set.
- Return type:
bool
- start(*, _ray_trace_ctx=None) bool[source]#
Starts the actor’s processing loop in a background thread.
Initializes state, resets statistics, and launches the _processing_loop thread. Idempotent: if called while already running, it logs a warning and returns False.
- Returns:
True if the actor was successfully started, False if it was already running.
- Return type:
bool
nv_ingest.framework.orchestration.ray.stages.meta.udf_parallel_helper module#
UDF Parallel Stage - A high-concurrency no-op stage for parallel UDF execution.
This stage does nothing except pass messages through, but with high replica count it provides a parallel execution pool for UDFs to achieve N-way concurrency.