nv_ingest.framework.orchestration.ray.primitives package#
Submodules#
nv_ingest.framework.orchestration.ray.primitives.dataclasses module#
nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor module#
- class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.DisplayConfig(use_gui: bool = False)[source]#
Bases:
objectConfiguration for monitoring display.
- use_gui: bool = False#
- class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.MonitorConfig(
- use_gui: bool = False,
- poll_interval: float = 5.0,
- use_console: bool = True,
Bases:
objectConfiguration specific to the PipelineMonitor.
- poll_interval: float = 5.0#
- use_console: bool = True#
- use_gui: bool = False#
- class nv_ingest.framework.orchestration.ray.primitives.pipeline_monitor.PipelineMonitor(
- pipeline: Any,
- config: MonitorConfig,
Bases:
objectMonitors a RayPipeline instance and manages its display (GUI or Console).
Runs in a separate thread, periodically fetching data from the pipeline and updating the display based on its own configuration. Decoupled from the RayPipeline lifecycle.
nv_ingest.framework.orchestration.ray.primitives.pipeline_topology module#
- class nv_ingest.framework.orchestration.ray.primitives.pipeline_topology.PipelineTopology[source]#
Bases:
objectHolds the structural definition and runtime state of the pipeline.
Encapsulates stages, connections, actors, queues, and associated state with thread-safe access via internal locking.
- add_actor_to_stage(
- stage_name: str,
- actor: Any,
Adds a single actor to a stage’s list.
- add_connection(
- from_stage: str,
- to_stage: str,
- queue_size: int,
Adds a connection definition between two stages.
- get_connections() Dict[str, List[Tuple[str, int]]][source]#
Returns a shallow copy of the connection dictionary.
- get_edge_queues() Dict[str, Tuple[Any, int]][source]#
Returns a shallow copy of the edge queues’ dictionary.
- get_stage_actors() Dict[str, List[Any]][source]#
Returns a copy of the stage actors dictionary (with copies of actor lists).
- get_stage_info(
- stage_name: str,
Returns the StageInfo for a specific stage, or None if not found.
- get_stage_memory_overhead() Dict[str, float][source]#
Returns a copy of the stage memory overhead dictionary.
- get_stages_info() List[StageInfo][source]#
Returns a copy of stage info with pending_shutdown flags updated.
- lock_context() Iterator[PipelineTopology][source]#
Provides safe access to the topology under lock for complex operations.
- mark_actor_for_removal(
- stage_name: str,
- actor: Any,
Marks an actor as pending removal, to be cleaned up by the background thread.
- remove_actors_from_stage(
- stage_name: str,
- actors_to_remove: List[Any],
Removes specific actors from a stage’s list immediately. Called by the cleanup thread or potentially for forced removal.
- set_actors_for_stage(
- stage_name: str,
- actors: List[Any],
Sets the list of actors for a given stage, resetting scaling state.
- set_edge_queues(
- queues: Dict[str, Tuple[Any, int]],
Sets the dictionary of edge queues.
- set_stage_memory_overhead(
- overheads: Dict[str, float],
Sets the estimated memory overhead for stages.
nv_ingest.framework.orchestration.ray.primitives.ray_pipeline module#
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.FlushingConfig(
- queue_flush_interval_seconds: int = 600,
- queue_flush_drain_timeout_seconds: int = 300,
- quiet_period_threshold: int = 0,
- consecutive_quiet_cycles_for_flush: int = 3,
Bases:
objectConfiguration for queue flushing behavior.
- consecutive_quiet_cycles_for_flush: int = 3#
- queue_flush_drain_timeout_seconds: int = 300#
- queue_flush_interval_seconds: int = 600#
- quiet_period_threshold: int = 0#
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.PipelineInterface[source]#
Bases:
ABCAbstract base class for pipeline implementations.
Any concrete pipeline must implement start and stop methods.
- abstractmethod start(
- monitor_poll_interval: float = 5.0,
- scaling_poll_interval: float = 30.0,
Start the pipeline.
- Parameters:
monitor_poll_interval (float) – Interval in seconds for the monitoring poll (default: 5.0).
scaling_poll_interval (float) – Interval in seconds for scaling decisions (default: 30.0).
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipeline(
- scaling_config: ScalingConfig = ScalingConfig(dynamic_memory_scaling=True, dynamic_memory_threshold=0.75, pid_kp=0.2, pid_ki=0.01, pid_kd=0.0, pid_ema_alpha=0.1, pid_target_queue_depth=0, pid_penalty_factor=0.1, pid_error_boost_factor=1.5, rcm_memory_safety_buffer_fraction=0.15),
- flushing_config: FlushingConfig = FlushingConfig(queue_flush_interval_seconds=600, queue_flush_drain_timeout_seconds=300, quiet_period_threshold=0, consecutive_quiet_cycles_for_flush=3),
- stats_config: StatsConfig = StatsConfig(collection_interval_seconds=10.0, actor_timeout_seconds=5.0, queue_timeout_seconds=2.0),
Bases:
PipelineInterfaceA structured pipeline supporting dynamic scaling and queue flushing. Uses PIDController and ResourceConstraintManager. Supports optional GUI display. Delegates statistics collection to RayStatsCollector.
Configuration is managed via dedicated config objects (ScalingConfig, etc.).
- add_sink(
- *,
- name: str,
- sink_actor: Any,
- config: BaseModel,
- min_replicas: int = 1,
- max_replicas: int = 1,
Adds a sink stage to the pipeline.
- Parameters:
name (str) – The name of the sink stage.
sink_actor (Any) – The actor or callable for the sink stage.
config (BaseModel) – The configuration for the sink stage.
min_replicas (int, optional) – The minimum number of replicas for the sink stage, by default 1.
max_replicas (int, optional) – The maximum number of replicas for the sink stage, by default 1.
- Returns:
The pipeline instance.
- Return type:
- add_source(
- *,
- name: str,
- source_actor: Any,
- config: BaseModel,
- min_replicas: int = 1,
- max_replicas: int = 1,
Adds a source stage to the pipeline.
- Parameters:
name (str) – The name of the source stage.
source_actor (Any) – The actor or callable for the source stage.
config (BaseModel) – The configuration for the source stage.
min_replicas (int, optional) – The minimum number of replicas for the source stage, by default 1.
max_replicas (int, optional) – The maximum number of replicas for the source stage, by default 1.
- Returns:
The pipeline instance.
- Return type:
- add_stage(
- *,
- name: str,
- stage_actor: Any,
- config: BaseModel,
- min_replicas: int = 0,
- max_replicas: int = 1,
Adds a stage to the pipeline.
- Parameters:
name (str) – The name of the stage.
stage_actor (Any) – The actor or callable for the stage.
config (BaseModel) – The configuration for the stage.
min_replicas (int, optional) – The minimum number of replicas for the stage, by default 0.
max_replicas (int, optional) – The maximum number of replicas for the stage, by default 1.
- Returns:
The pipeline instance.
- Return type:
- build() Dict[str, List[Any]][source]#
Builds the pipeline: configures, instantiates, wires, using topology.
- Returns:
A dictionary mapping stage names to lists of actor handles.
- Return type:
Dict[str, List[Any]]
- get_edge_queues() Dict[str, Tuple[Any, int]][source]#
Returns a snapshot of the current edge queues.
- Returns:
A dictionary mapping queue names to tuples of (queue_handle, queue_size).
- Return type:
Dict[str, Tuple[Any, int]]
- get_stage_actors() Dict[str, List[Any]][source]#
Returns a snapshot of the current actors per stage.
- Returns:
A dictionary mapping stage names to lists of actor handles.
- Return type:
Dict[str, List[Any]]
- get_stages_info() List[StageInfo][source]#
Returns a snapshot of the current stage information.
- Returns:
A list of StageInfo objects from the topology.
- Return type:
List[StageInfo]
- make_edge(
- from_stage: str,
- to_stage: str,
- queue_size: int = 100,
Creates an edge between two stages in the pipeline.
- Parameters:
from_stage (str) – The name of the source stage.
to_stage (str) – The name of the destination stage.
queue_size (int, optional) – The size of the queue between the stages, by default 100.
- Returns:
The pipeline instance.
- Return type:
- request_queue_flush(force: bool = False) None[source]#
Requests a queue flush, checking topology state.
- Parameters:
force (bool, optional) – Whether to force the flush, by default False.
- start(
- monitor_poll_interval: float = 5.0,
- scaling_poll_interval: float = 30.0,
Starts the pipeline actors and background monitoring threads.
Assumes the pipeline has already been built via the build() method.
- Parameters:
monitor_poll_interval (float, optional) – This parameter is currently unused but is kept for interface compatibility.
scaling_poll_interval (float, optional) – The interval in seconds for the autoscaling and maintenance thread to run, by default 30.0.
- stop() None[source]#
Stops the pipeline and all associated actors and threads.
This method performs a graceful shutdown by: 1. Stopping the autoscaling and statistics collection threads. 2. Signaling all actors to stop and waiting for confirmation. 3. Stopping the topology cleanup thread. 4. Clearing all runtime state from the topology.
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipelineInterface(
- pipeline: RayPipeline,
Bases:
PipelineInterfacePipeline interface for an in-process RayPipeline instance.
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.RayPipelineSubprocessInterface(process: Process)[source]#
Bases:
PipelineInterfacePipeline interface implementation for a subprocess-based Ray pipeline.
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.ScalingConfig(
- dynamic_memory_scaling: bool = True,
- dynamic_memory_threshold: float = 0.75,
- pid_kp: float = 0.2,
- pid_ki: float = 0.01,
- pid_kd: float = 0.0,
- pid_ema_alpha: float = 0.1,
- pid_target_queue_depth: int = 0,
- pid_penalty_factor: float = 0.1,
- pid_error_boost_factor: float = 1.5,
- rcm_memory_safety_buffer_fraction: float = 0.15,
Bases:
objectConfiguration for PID and Resource Constraint Manager based scaling.
- dynamic_memory_scaling: bool = True#
- dynamic_memory_threshold: float = 0.75#
- pid_ema_alpha: float = 0.1#
- pid_error_boost_factor: float = 1.5#
- pid_kd: float = 0.0#
- pid_ki: float = 0.01#
- pid_kp: float = 0.2#
- pid_penalty_factor: float = 0.1#
- pid_target_queue_depth: int = 0#
- rcm_memory_safety_buffer_fraction: float = 0.15#
- class nv_ingest.framework.orchestration.ray.primitives.ray_pipeline.StatsConfig(
- collection_interval_seconds: float = 10.0,
- actor_timeout_seconds: float = 5.0,
- queue_timeout_seconds: float = 2.0,
Bases:
objectConfiguration for the RayStatsCollector.
- actor_timeout_seconds: float = 5.0#
- collection_interval_seconds: float = 10.0#
- queue_timeout_seconds: float = 2.0#
nv_ingest.framework.orchestration.ray.primitives.ray_stat_collector module#
- class nv_ingest.framework.orchestration.ray.primitives.ray_stat_collector.RayStatsCollector(
- pipeline_accessor: Any,
- interval: float = 30.0,
- actor_timeout: float = 5.0,
- queue_timeout: float = 2.0,
- ema_alpha: float = 0.1,
Bases:
objectCollects statistics from a RayPipeline’s actors and queues in parallel using a dedicated background thread.
- collect_stats_now() Tuple[Dict[str, Dict[str, int]], int, bool][source]#
Performs a single collection cycle of statistics from pipeline actors/queues.
- Returns:
A dictionary mapping stage names to their collected statistics, and a boolean indicating if the overall collection was successful.
- Return type:
Tuple[Dict[str, Dict[str, int]], bool]
- get_latest_stats() Tuple[Dict[str, Dict[str, int]], int, float, bool][source]#
Returns the most recently collected statistics, update time, and success status.
- Returns:
A tuple containing: - A dictionary mapping stage names to their statistics (or empty if none collected). - The timestamp (time.time()) of the last update attempt. - A boolean indicating if the last collection was successful.
- Return type:
Tuple[Dict[str, Dict[str, int]], float, bool]