nv_ingest.framework.orchestration.ray.primitives package#

Submodules#

nv_ingest.framework.orchestration.ray.primitives.dataclasses module#

nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor module#

class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.DisplayConfig(use_gui: bool = False)[source]#

Bases: object

Configuration for monitoring display.

use_gui: bool = False#
class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.MonitorConfig(
use_gui: bool = False,
poll_interval: float = 5.0,
use_console: bool = True,
)[source]#

Bases: object

Configuration specific to the PipelineMonitor.

poll_interval: float = 5.0#
use_console: bool = True#
use_gui: bool = False#
class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.PipelineMonitor(
pipeline: Any,
config: MonitorConfig,
)[source]#

Bases: object

Monitors a RayPipeline instance and manages its display (GUI or Console).

Runs in a separate thread, periodically fetching data from the pipeline and updating the display based on its own configuration. Decoupled from the RayPipeline lifecycle.

start() None[source]#

Starts the monitoring thread and display.

stop() None[source]#

Stops the monitoring thread and cleans up the display.

nv_ingest.framework.orchestration.ray.primitives.pipeline_topology module#

class nv_ingest.framework.orchestration.ray.primitives.pipeline_topology.PipelineTopology[source]#

Bases: object

Holds the structural definition and runtime state of the pipeline.

Encapsulates stages, connections, actors, queues, and associated state with thread-safe access via internal locking.

add_actor_to_stage(
stage_name: str,
actor: Any,
) None[source]#

Adds a single actor to a stage’s list.

add_connection(
from_stage: str,
to_stage: str,
queue_size: int,
) None[source]#

Adds a connection definition between two stages.

add_stage(
stage_info: StageInfo,
) None[source]#

Adds a stage definition.

clear_runtime_state() None[source]#

Clears actors, queues, and scaling state. Keeps definitions.

get_actor_count(stage_name: str) int[source]#

Returns the number of actors for a specific stage.

get_all_actors() List[Any][source]#

Returns a list of all actors across all stages.

get_connections() Dict[str, List[Tuple[str, int]]][source]#

Returns a shallow copy of the connection dictionary.

get_edge_queues() Dict[str, Tuple[Any, int]][source]#

Returns a shallow copy of the edge queues’ dictionary.

get_is_flushing() bool[source]#

Returns the current flushing state.

get_scaling_state() Dict[str, str][source]#

Returns a copy of the scaling state dictionary.

get_stage_actors() Dict[str, List[Any]][source]#

Returns a copy of the stage actors dictionary (with copies of actor lists).

get_stage_info(
stage_name: str,
) StageInfo | None[source]#

Returns the StageInfo for a specific stage, or None if not found.

get_stage_memory_overhead() Dict[str, float][source]#

Returns a copy of the stage memory overhead dictionary.

get_stages_info() List[StageInfo][source]#

Returns a copy of stage info with pending_shutdown flags updated.

lock_context() Iterator[PipelineTopology][source]#

Provides safe access to the topology under lock for complex operations.

mark_actor_for_removal(
stage_name: str,
actor: Any,
) None[source]#

Marks an actor as pending removal, to be cleaned up by the background thread.

remove_actors_from_stage(
stage_name: str,
actors_to_remove: List[Any],
) List[Any][source]#

Removes specific actors from a stage’s list immediately. Called by the cleanup thread or potentially for forced removal.

set_actors_for_stage(
stage_name: str,
actors: List[Any],
) None[source]#

Sets the list of actors for a given stage, resetting scaling state.

set_edge_queues(
queues: Dict[str, Tuple[Any, int]],
) None[source]#

Sets the dictionary of edge queues.

set_flushing(is_flushing: bool) None[source]#

Sets the pipeline flushing state.

set_stage_memory_overhead(
overheads: Dict[str, float],
) None[source]#

Sets the estimated memory overhead for stages.

start_cleanup_thread(interval: int = 5) None[source]#

Starts the background thread for periodic cleanup tasks.

stop_cleanup_thread() None[source]#

Stops the background cleanup thread.

update_scaling_state(
stage_name: str,
state: str,
) None[source]#

Updates the scaling state for a stage.

class nv_ingest.framework.orchestration.ray.primitives.pipeline_topology.StageInfo(
name,
callable,
config,
is_source=False,
is_sink=False,
min_replicas=0,
max_replicas=1,
pending_shutdown=False,
)[source]#

Bases: object

nv_ingest.framework.orchestration.ray.primitives.ray_pipeline module#

class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.FlushingConfig(
queue_flush_interval_seconds: int = 600,
queue_flush_drain_timeout_seconds: int = 300,
quiet_period_threshold: int = 0,
consecutive_quiet_cycles_for_flush: int = 3,
)[source]#

Bases: object

Configuration for queue flushing behavior.

consecutive_quiet_cycles_for_flush: int = 3#
queue_flush_drain_timeout_seconds: int = 300#
queue_flush_interval_seconds: int = 600#
quiet_period_threshold: int = 0#
class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.PipelineInterface[source]#

Bases: ABC

Abstract base class for pipeline implementations.

Any concrete pipeline must implement start and stop methods.

abstractmethod start(
monitor_poll_interval: float = 5.0,
scaling_poll_interval: float = 30.0,
) None[source]#

Start the pipeline.

Parameters:
  • monitor_poll_interval (float) – Interval in seconds for the monitoring poll (default: 5.0).

  • scaling_poll_interval (float) – Interval in seconds for scaling decisions (default: 30.0).

abstractmethod stop() None[source]#

Stop the pipeline and perform any necessary cleanup.

class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipeline(
scaling_config: ScalingConfig = ScalingConfig(dynamic_memory_scaling=True, dynamic_memory_threshold=0.75, pid_kp=0.2, pid_ki=0.01, pid_kd=0.0, pid_ema_alpha=0.1, pid_target_queue_depth=0, pid_penalty_factor=0.1, pid_error_boost_factor=1.5, rcm_memory_safety_buffer_fraction=0.15),
flushing_config: FlushingConfig = FlushingConfig(queue_flush_interval_seconds=600, queue_flush_drain_timeout_seconds=300, quiet_period_threshold=0, consecutive_quiet_cycles_for_flush=3),
stats_config: StatsConfig = StatsConfig(collection_interval_seconds=10.0, actor_timeout_seconds=5.0, queue_timeout_seconds=2.0),
)[source]#

Bases: PipelineInterface

A structured pipeline supporting dynamic scaling and queue flushing. Uses PIDController and ResourceConstraintManager. Supports optional GUI display. Delegates statistics collection to RayStatsCollector.

Configuration is managed via dedicated config objects (ScalingConfig, etc.).

add_sink(
*,
name: str,
sink_actor: Any,
config: BaseModel,
min_replicas: int = 1,
max_replicas: int = 1,
) RayPipeline[source]#

Adds a sink stage to the pipeline.

Parameters:
  • name (str) – The name of the sink stage.

  • sink_actor (Any) – The actor or callable for the sink stage.

  • config (BaseModel) – The configuration for the sink stage.

  • min_replicas (int, optional) – The minimum number of replicas for the sink stage, by default 1.

  • max_replicas (int, optional) – The maximum number of replicas for the sink stage, by default 1.

Returns:

The pipeline instance.

Return type:

RayPipeline

add_source(
*,
name: str,
source_actor: Any,
config: BaseModel,
min_replicas: int = 1,
max_replicas: int = 1,
) RayPipeline[source]#

Adds a source stage to the pipeline.

Parameters:
  • name (str) – The name of the source stage.

  • source_actor (Any) – The actor or callable for the source stage.

  • config (BaseModel) – The configuration for the source stage.

  • min_replicas (int, optional) – The minimum number of replicas for the source stage, by default 1.

  • max_replicas (int, optional) – The maximum number of replicas for the source stage, by default 1.

Returns:

The pipeline instance.

Return type:

RayPipeline

add_stage(
*,
name: str,
stage_actor: Any,
config: BaseModel,
min_replicas: int = 0,
max_replicas: int = 1,
) RayPipeline[source]#

Adds a stage to the pipeline.

Parameters:
  • name (str) – The name of the stage.

  • stage_actor (Any) – The actor or callable for the stage.

  • config (BaseModel) – The configuration for the stage.

  • min_replicas (int, optional) – The minimum number of replicas for the stage, by default 0.

  • max_replicas (int, optional) – The maximum number of replicas for the stage, by default 1.

Returns:

The pipeline instance.

Return type:

RayPipeline

build() Dict[str, List[Any]][source]#

Builds the pipeline: configures, instantiates, wires, using topology.

Returns:

A dictionary mapping stage names to lists of actor handles.

Return type:

Dict[str, List[Any]]

get_edge_queues() Dict[str, Tuple[Any, int]][source]#

Returns a snapshot of the current edge queues.

Returns:

A dictionary mapping queue names to tuples of (queue_handle, queue_size).

Return type:

Dict[str, Tuple[Any, int]]

get_stage_actors() Dict[str, List[Any]][source]#

Returns a snapshot of the current actors per stage.

Returns:

A dictionary mapping stage names to lists of actor handles.

Return type:

Dict[str, List[Any]]

get_stages_info() List[StageInfo][source]#

Returns a snapshot of the current stage information.

Returns:

A list of StageInfo objects from the topology.

Return type:

List[StageInfo]

make_edge(
from_stage: str,
to_stage: str,
queue_size: int = 100,
) RayPipeline[source]#

Creates an edge between two stages in the pipeline.

Parameters:
  • from_stage (str) – The name of the source stage.

  • to_stage (str) – The name of the destination stage.

  • queue_size (int, optional) – The size of the queue between the stages, by default 100.

Returns:

The pipeline instance.

Return type:

RayPipeline

request_queue_flush(force: bool = False) None[source]#

Requests a queue flush, checking topology state.

Parameters:

force (bool, optional) – Whether to force the flush, by default False.

start(
monitor_poll_interval: float = 5.0,
scaling_poll_interval: float = 30.0,
) None[source]#

Starts the pipeline actors and background monitoring threads.

Assumes the pipeline has already been built via the build() method.

Parameters:
  • monitor_poll_interval (float, optional) – This parameter is currently unused but is kept for interface compatibility.

  • scaling_poll_interval (float, optional) – The interval in seconds for the autoscaling and maintenance thread to run, by default 30.0.

stop() None[source]#

Stops the pipeline and all associated actors and threads.

This method performs a graceful shutdown by: 1. Stopping the autoscaling and statistics collection threads. 2. Signaling all actors to stop and waiting for confirmation. 3. Stopping the topology cleanup thread. 4. Clearing all runtime state from the topology.

class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipelineInterface(
pipeline: RayPipeline,
)[source]#

Bases: PipelineInterface

Pipeline interface for an in-process RayPipeline instance.

start(
monitor_poll_interval: float = 5.0,
scaling_poll_interval: float = 30.0,
) None[source]#

Starts the RayPipeline.

Parameters:
  • monitor_poll_interval (float) – Unused here; provided for interface compatibility.

  • scaling_poll_interval (float) – Unused here; provided for interface compatibility.

stop() None[source]#

Stops the RayPipeline and shuts down Ray.

class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipelineSubprocessInterface(process: Process)[source]#

Bases: PipelineInterface

Pipeline interface implementation for a subprocess-based Ray pipeline.

start(
monitor_poll_interval: float = 5.0,
scaling_poll_interval: float = 30.0,
) None[source]#

Start is not supported because the subprocess is assumed to already be running.

stop() None[source]#

Stops the subprocess pipeline and its entire process group to ensure any child processes (e.g., the simple message broker) are terminated.

class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.ScalingConfig(
dynamic_memory_scaling: bool = True,
dynamic_memory_threshold: float = 0.75,
pid_kp: float = 0.2,
pid_ki: float = 0.01,
pid_kd: float = 0.0,
pid_ema_alpha: float = 0.1,
pid_target_queue_depth: int = 0,
pid_penalty_factor: float = 0.1,
pid_error_boost_factor: float = 1.5,
rcm_memory_safety_buffer_fraction: float = 0.15,
)[source]#

Bases: object

Configuration for PID and Resource Constraint Manager based scaling.

dynamic_memory_scaling: bool = True#
dynamic_memory_threshold: float = 0.75#
pid_ema_alpha: float = 0.1#
pid_error_boost_factor: float = 1.5#
pid_kd: float = 0.0#
pid_ki: float = 0.01#
pid_kp: float = 0.2#
pid_penalty_factor: float = 0.1#
pid_target_queue_depth: int = 0#
rcm_memory_safety_buffer_fraction: float = 0.15#
class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.StatsConfig(
collection_interval_seconds: float = 10.0,
actor_timeout_seconds: float = 5.0,
queue_timeout_seconds: float = 2.0,
)[source]#

Bases: object

Configuration for the RayStatsCollector.

actor_timeout_seconds: float = 5.0#
collection_interval_seconds: float = 10.0#
queue_timeout_seconds: float = 2.0#

nv_ingest.framework.orchestration.ray.primitives.ray_stat_collector module#

class nv_ingest.framework.orchestration.ray.primitives.ray_stat_collector.RayStatsCollector(
pipeline_accessor: Any,
interval: float = 30.0,
actor_timeout: float = 5.0,
queue_timeout: float = 2.0,
ema_alpha: float = 0.1,
)[source]#

Bases: object

Collects statistics from a RayPipeline’s actors and queues in parallel using a dedicated background thread.

collect_stats_now() Tuple[Dict[str, Dict[str, int]], int, bool][source]#

Performs a single collection cycle of statistics from pipeline actors/queues.

Returns:

A dictionary mapping stage names to their collected statistics, and a boolean indicating if the overall collection was successful.

Return type:

Tuple[Dict[str, Dict[str, int]], bool]

get_latest_stats() Tuple[Dict[str, Dict[str, int]], int, float, bool][source]#

Returns the most recently collected statistics, update time, and success status.

Returns:

A tuple containing: - A dictionary mapping stage names to their statistics (or empty if none collected). - The timestamp (time.time()) of the last update attempt. - A boolean indicating if the last collection was successful.

Return type:

Tuple[Dict[str, Dict[str, int]], float, bool]

start() None[source]#

Starts the dedicated background statistics collection thread.

stop() None[source]#

Signals the background stats collection thread to stop and waits for it.

Module contents#