nv_ingest.framework.orchestration.ray.stages.meta package#

Submodules#

nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_edge_base module#

class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_edge_base.RayActorEdge(
max_size: int,
multi_reader: bool = False,
multi_writer: bool = False,
)[source]#

Bases: ABC

Abstract base class for a Ray actor edge used in a RayPipeline.

Parameters:
  • max_size (int) – The maximum size of the edge’s internal queue.

  • multi_reader (bool) – Whether the edge supports multiple concurrent readers.

  • multi_writer (bool) – Whether the edge supports multiple concurrent writers.

abstractmethod get_stats() Dict[str, int][source]#

Get current statistics for the edge.

Returns:

A dictionary containing statistics (e.g. write_count, read_count, queue_full_count, current_size).

Return type:

Dict[str, int]

abstractmethod read() Any[source]#

Read an item from the edge.

Returns:

The next item in the edge.

Return type:

Any

abstractmethod write(item: Any) bool[source]#

Write an item into the edge.

Parameters:

item (Any) – The item to enqueue.

Returns:

True if the item was enqueued successfully.

Return type:

bool

nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base module#

class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_sink_stage_base.RayActorSinkStage(
config: Any,
log_to_stdout=False,
stage_name: str | None = None,
)[source]#

Bases: RayActorStage, ABC

Abstract base class for sink stages in a RayPipeline. Sink stages do not support an output queue; instead, they implement write_output to deliver their final processed messages.

set_output_queue(
queue_handle: any,
*,
_ray_trace_ctx=None,
) bool[source]#

Sets the output queue handle for this actor stage.

Should be called before start().

Parameters:

queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) to which this actor should write output items.

Returns:

True indicating the queue was set.

Return type:

bool

nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_source_stage_base module#

class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_source_stage_base.RayActorSourceStage(
config: Any,
log_to_stdout=False,
stage_name: str | None = None,
)[source]#

Bases: RayActorStage, ABC

Abstract base class for source stages in a RayPipeline. Source stages do not support an input queue. Instead, they must implement get_input() to fetch control messages from an external source.

on_data(
IngestControlMessage,
*,
_ray_trace_ctx=None,
)[source]#

Process a single data item (control message).

This is the core logic method that must be implemented by subclasses. It receives an item dequeued by read_input and performs the stage-specific processing.

Parameters:

control_message (Any) – The data item retrieved from the input queue.

Returns:

The result of the processing. If a result is returned (not None), it will be placed onto the output_queue. Return None if this stage does not produce output or if this specific message yields no result.

Return type:

Optional[Any]

pause() bool[source]#

Pause the source stage so that it will not write to its output queue.

resume() bool[source]#

Resume the source stage to allow writing to its output queue.

set_input_queue(
queue_handle: Any,
*,
_ray_trace_ctx=None,
) bool[source]#

Sets the input queue handle for this actor stage.

Should be called before start().

Parameters:

queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) from which this actor should read input items.

Returns:

True indicating the queue was set.

Return type:

bool

nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base module#

class nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base.RayActorStage(
config: BaseModel,
stage_name: str | None = None,
log_to_stdout=False,
)[source]#

Bases: ABC

Abstract base class for a stateful Ray actor stage in a processing pipeline.

This class provides a common structure for actors that consume items from an input queue, process them, and potentially place results onto an output queue. It utilizes a background thread for the main processing loop to avoid blocking the main Ray actor thread. It includes basic statistics tracking (processed count, elapsed time, processing rate) and mechanisms for graceful shutdown.

Subclasses must implement the on_data method to define the specific processing logic for each item.

config#

Configuration object for the stage.

Type:

BaseModel

stage_name#

Name of the stage from YAML pipeline configuration. Used by stage-aware decorators for consistent naming.

Type:

Optional[str]

_input_queue#

Handle to the Ray queue from which input items are read. Expected to be set via set_input_queue.

Type:

Optional[Any]

_output_queue#

Handle to the Ray queue where processed items are placed. Expected to be set via set_output_queue.

Type:

Optional[Any]

_running#

Flag indicating if the processing loop should be actively running. Set to True by start() and False by stop(). Controls the main loop.

Type:

bool

_active_processing#

Flag indicating if the on_data method is currently executing. Useful for understanding if the actor is busy at a given moment.

Type:

bool

stats#

Dictionary to store basic operational statistics. Currently tracks ‘processed’.

Type:

Dict[str, int]

start_time#

Timestamp (from time.time()) when the start() method was called. Used for calculating total elapsed time.

Type:

Optional[float]

_last_processed_count#

Internal state variable storing the processed count at the last get_stats call. Used for calculating interval processing rate.

Type:

int

_last_stats_time#

Internal state variable storing the timestamp of the last get_stats call. Used for calculating interval processing rate.

Type:

Optional[float]

_processing_thread#

Handle to the background thread running the _processing_loop.

Type:

Optional[threading.Thread]

_shutting_down#

Internal flag to prevent redundant shutdown actions, protected by _lock.

Type:

bool

_lock#

Lock to protect access to shutdown-related state (_shutting_down).

Type:

threading.Lock

get_stats(
*,
_ray_trace_ctx=None,
) Dict[str, Any][source]#

Retrieves performance statistics for the actor.

Calculates the approximate processing rate since the last call to get_stats or since start().

Returns:

A dictionary containing statistics:
  • ’processed’ (int): Total items processed since the actor started.

  • ’elapsed’ (float): Total time in seconds since the actor started.

  • ’active_processing’ (bool): Whether the actor was actively

    processing an item in on_data at the moment this method was called.

  • ’processing_rate_cps’ (float): Calculated items processed per

    second during the last interval. Can be zero if no items were processed or the interval was too short.

  • ’memory_mb’ (float): The total memory usage of the current actor process (RSS) in megabytes (MB).

Return type:

Dict[str, Any]

is_shutdown_complete(*, _ray_trace_ctx=None) bool[source]#

Checks if the actor’s processing loop has finished and signaled completion. Raises RayActorError if the actor process has terminated.

abstractmethod on_data(
control_message: Any,
) Any | None[source]#

Process a single data item (control message).

This is the core logic method that must be implemented by subclasses. It receives an item dequeued by read_input and performs the stage-specific processing.

Parameters:

control_message (Any) – The data item retrieved from the input queue.

Returns:

The result of the processing. If a result is returned (not None), it will be placed onto the output_queue. Return None if this stage does not produce output or if this specific message yields no result.

Return type:

Optional[Any]

set_input_queue(
queue_handle: Any,
*,
_ray_trace_ctx=None,
) bool[source]#

Sets the input queue handle for this actor stage.

Should be called before start().

Parameters:

queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) from which this actor should read input items.

Returns:

True indicating the queue was set.

Return type:

bool

set_output_queue(
queue_handle: Any,
*,
_ray_trace_ctx=None,
) bool[source]#

Sets the output queue handle for this actor stage.

Should be called before start().

Parameters:

queue_handle (Any) – The Ray queue handle (e.g., ray.util.queue.Queue) to which this actor should write output items.

Returns:

True indicating the queue was set.

Return type:

bool

start(*, _ray_trace_ctx=None) bool[source]#

Starts the actor’s processing loop in a background thread.

Initializes state, resets statistics, and launches the _processing_loop thread. Idempotent: if called while already running, it logs a warning and returns False.

Returns:

True if the actor was successfully started, False if it was already running.

Return type:

bool

stop(*, _ray_trace_ctx=None) None[source]#

Stops the actor’s processing loop by setting the running flag to False.

nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base.setup_stdout_logging(
name: str = 'nv_ingest.framework.orchestration.ray.stages.meta.ray_actor_stage_base',
level: int = 20,
) Logger[source]#

nv_ingest.framework.orchestration.ray.stages.meta.udf_parallel_helper module#

UDF Parallel Stage - A high-concurrency no-op stage for parallel UDF execution.

This stage does nothing except pass messages through, but with high replica count it provides a parallel execution pool for UDFs to achieve N-way concurrency.

Module contents#