nv_ingest.pipeline package#
Subpackages#
Submodules#
nv_ingest.pipeline.default_libmode_pipeline_impl module#
Default pipeline implementation for libmode.
This module contains the default libmode pipeline configuration as a string, allowing the pipeline to be loaded without requiring external YAML files.
nv_ingest.pipeline.default_pipeline_impl module#
Default pipeline implementation (runtime default).
This module embeds the exact contents of config/default_pipeline.yaml so code can load the default pipeline without reading the YAML file at runtime.
nv_ingest.pipeline.ingest_pipeline module#
- class nv_ingest.pipeline.ingest_pipeline.IngestPipelineBuilder(
- config: PipelineConfigSchema,
- system_resource_probe: SystemResourceProbe | None = None,
Bases:
objectA high-level builder for creating and configuring an ingestion pipeline.
This class translates a PipelineConfig object into a runnable RayPipeline, handling class resolution, configuration validation, replica calculation, and stage/edge construction.
- _config#
The declarative configuration for the pipeline.
- Type:
- _pipeline#
The underlying RayPipeline instance being constructed.
- Type:
- _system_resource_probe#
A utility to probe for available system resources like CPU cores.
- Type:
- build() None[source]#
Builds the ingestion pipeline from the configuration.
This method constructs the RayPipeline by adding stages and edges as defined in the pipeline configuration. It also validates dependencies and ensures the pipeline is ready to be started.
- Raises:
ValueError – If the pipeline configuration is invalid, such as containing circular dependencies or references to non-existent stages.
- get_pipeline() RayPipeline[source]#
Returns the underlying RayPipeline instance.
- Returns:
The raw RayPipeline object.
- Return type:
nv_ingest.pipeline.pipeline_schema module#
- pydantic model nv_ingest.pipeline.pipeline_schema.EdgeConfig[source]#
Bases:
BaseModelConfiguration for an edge between two stages.
Defines a connection from a source stage to a destination stage, including the size of the intermediate queue.
- from_stage#
The name of the source stage for the edge.
- Type:
str
- to_stage#
The name of the destination stage for the edge.
- Type:
str
- queue_size#
The maximum number of items in the queue between the two stages.
- Type:
int
Show JSON schema
{ "title": "EdgeConfig", "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n The name of the source stage for the edge.\nto_stage : str\n The name of the destination stage for the edge.\nqueue_size : int\n The maximum number of items in the queue between the two stages.", "type": "object", "properties": { "from": { "description": "The name of the source stage.", "title": "From", "type": "string" }, "to": { "description": "The name of the destination stage.", "title": "To", "type": "string" }, "queue_size": { "default": 100, "description": "The size of the queue between stages.", "exclusiveMinimum": 0, "title": "Queue Size", "type": "integer" } }, "additionalProperties": false, "required": [ "from", "to" ] }
- Config:
extra: str = forbid
- Fields:
- field from_stage: str [Required] (alias 'from')#
The name of the source stage.
- field queue_size: int = 100#
The size of the queue between stages.
- Constraints:
gt = 0
- field to_stage: str [Required] (alias 'to')#
The name of the destination stage.
- pydantic model nv_ingest.pipeline.pipeline_schema.PIDControllerConfig[source]#
Bases:
BaseModelConfiguration for the PID controller used in dynamic scaling.
- kp#
Proportional gain for the PID controller.
- Type:
float
- ki#
Integral gain for the PID controller.
- Type:
float
- ema_alpha#
Exponential moving average alpha for the PID controller.
- Type:
float
- target_queue_depth#
Target queue depth for the PID controller.
- Type:
int
- penalty_factor#
Penalty factor for the PID controller.
- Type:
float
- error_boost_factor#
Error boost factor for the PID controller.
- Type:
float
- rcm_memory_safety_buffer_fraction#
Resource constraint manager memory safety buffer fraction.
- Type:
float
Show JSON schema
{ "title": "PIDControllerConfig", "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "type": "object", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- field ema_alpha: float = 0.1#
Exponential moving average alpha for the PID controller.
- Constraints:
ge = 0.0
le = 1.0
- field error_boost_factor: float = 1.5#
Error boost factor for the PID controller.
- Constraints:
gt = 0.0
- field ki: float = 0.01#
Integral gain for the PID controller.
- Constraints:
ge = 0.0
- field kp: float = 0.2#
Proportional gain for the PID controller.
- Constraints:
gt = 0.0
- field penalty_factor: float = 0.1#
Penalty factor for the PID controller.
- Constraints:
ge = 0.0
- field rcm_memory_safety_buffer_fraction: float = 0.15#
Resource constraint manager memory safety buffer fraction.
- Constraints:
ge = 0.0
le = 1.0
- field target_queue_depth: int = 0#
Target queue depth for the PID controller.
- Constraints:
ge = 0
- pydantic model nv_ingest.pipeline.pipeline_schema.PipelineConfigSchema[source]#
Bases:
BaseModelRoot configuration model for an ingestion pipeline.
This model represents the entire declarative configuration for an ingestion pipeline, including all stages and the edges that connect them.
- name#
The name of the pipeline.
- Type:
str
- description#
A description of the pipeline.
- Type:
str
- stages#
A list of all stage configurations in the pipeline.
- Type:
List[StageConfig]
- edges#
A list of all edge configurations that define the pipeline’s topology.
- Type:
List[EdgeConfig]
- pipeline#
description=”Runtime configuration for the pipeline.”)
- Type:
Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,
Show JSON schema
{ "title": "PipelineConfigSchema", "description": "Root configuration model for an ingestion pipeline.\n\nThis model represents the entire declarative configuration for an ingestion\npipeline, including all stages and the edges that connect them.\n\nAttributes\n----------\nname : str\n The name of the pipeline.\ndescription : str\n A description of the pipeline.\nstages : List[StageConfig]\n A list of all stage configurations in the pipeline.\nedges : List[EdgeConfig]\n A list of all edge configurations that define the pipeline's topology.\npipeline: Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,\n description=\"Runtime configuration for the pipeline.\")", "type": "object", "properties": { "name": { "description": "The name of the pipeline.", "title": "Name", "type": "string" }, "description": { "description": "A description of the pipeline.", "title": "Description", "type": "string" }, "stages": { "description": "List of all stages in the pipeline.", "items": { "$ref": "#/$defs/StageConfig" }, "title": "Stages", "type": "array" }, "edges": { "description": "List of all edges connecting the stages.", "items": { "$ref": "#/$defs/EdgeConfig" }, "title": "Edges", "type": "array" }, "pipeline": { "anyOf": [ { "$ref": "#/$defs/PipelineRuntimeConfig" }, { "type": "null" } ], "description": "Runtime configuration for the pipeline." } }, "$defs": { "EdgeConfig": { "additionalProperties": false, "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n The name of the source stage for the edge.\nto_stage : str\n The name of the destination stage for the edge.\nqueue_size : int\n The maximum number of items in the queue between the two stages.", "properties": { "from": { "description": "The name of the source stage.", "title": "From", "type": "string" }, "to": { "description": "The name of the destination stage.", "title": "To", "type": "string" }, "queue_size": { "default": 100, "description": "The size of the queue between stages.", "exclusiveMinimum": 0, "title": "Queue Size", "type": "integer" } }, "required": [ "from", "to" ], "title": "EdgeConfig", "type": "object" }, "PIDControllerConfig": { "additionalProperties": false, "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "title": "PIDControllerConfig", "type": "object" }, "PipelinePhase": { "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n Pre-processing phase.\nEXTRACTION : int\n Extraction phase.\nPOST_PROCESSING : int\n Post-processing phase.\nMUTATION : int\n Mutation phase.\nTRANSFORM : int\n Transform phase.\nRESPONSE : int\n Response phase.\nTELEMETRY : int\n Telemetry phase.\nDRAIN : int\n Drain phase.", "enum": [ 0, 1, 2, 3, 4, 5, 6, 7 ], "title": "PipelinePhase", "type": "integer" }, "PipelineRuntimeConfig": { "additionalProperties": false, "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n If True, launches a simple message broker for the pipeline.", "properties": { "disable_dynamic_scaling": { "default": false, "description": "Disable dynamic scaling of stage replicas.", "title": "Disable Dynamic Scaling", "type": "boolean" }, "dynamic_memory_threshold": { "default": 0.75, "description": "Memory utilization threshold for dynamic scaling.", "maximum": 0.95, "minimum": 0.0, "title": "Dynamic Memory Threshold", "type": "number" }, "static_memory_threshold": { "default": 0.75, "description": "Global memory threshold for static scaling mode.", "maximum": 1.0, "minimum": 0.0, "title": "Static Memory Threshold", "type": "number" }, "pid_controller": { "$ref": "#/$defs/PIDControllerConfig", "description": "PID controller configuration for dynamic scaling." }, "launch_simple_broker": { "default": false, "description": "Launch a simple message broker for the pipeline.", "title": "Launch Simple Broker", "type": "boolean" } }, "title": "PipelineRuntimeConfig", "type": "object" }, "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaConfig": { "additionalProperties": false, "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "title": "ReplicaConfig", "type": "object" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" }, "StageConfig": { "additionalProperties": false, "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n A unique name to identify the stage within the pipeline.\ntype : StageType\n The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n The logical phase of the stage in the pipeline.\nactor : Optional[str]\n The fully qualified import path to the actor class or function that\n implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n The fully qualified import path to a callable function that\n implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n List of task types this callable stage should filter for. Only applies to callable stages.\n Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n A flag to indicate whether the stage should be included in the pipeline.\n If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n The replica configuration for the stage.\nruns_after: List[str]\n A list of stage names that this stage must be downstream of.", "properties": { "name": { "description": "Unique name for the stage.", "title": "Name", "type": "string" }, "type": { "$ref": "#/$defs/StageType", "default": "stage", "description": "Type of the stage." }, "phase": { "$ref": "#/$defs/PipelinePhase", "description": "The logical phase of the stage." }, "actor": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to the stage's actor class or function.", "title": "Actor" }, "callable": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to a callable function for the stage.", "title": "Callable" }, "task_filters": { "anyOf": [ { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "description": "List of task types this callable stage should filter for. Only applies to callable stages.", "title": "Task Filters" }, "enabled": { "default": true, "description": "Whether the stage is enabled.", "title": "Enabled", "type": "boolean" }, "config": { "additionalProperties": true, "default": {}, "description": "Configuration dictionary for the stage.", "title": "Config", "type": "object" }, "replicas": { "$ref": "#/$defs/ReplicaConfig", "description": "Replica configuration." }, "runs_after": { "description": "List of stages this stage must run after.", "items": { "type": "string" }, "title": "Runs After", "type": "array" } }, "required": [ "name", "phase" ], "title": "StageConfig", "type": "object" }, "StageType": { "description": "The type of a pipeline stage.", "enum": [ "source", "stage", "sink" ], "title": "StageType", "type": "string" } }, "additionalProperties": false, "required": [ "name", "description", "stages", "edges" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field description: str [Required]#
A description of the pipeline.
- field edges: List[EdgeConfig] [Required]#
List of all edges connecting the stages.
- Validated by:
- field name: str [Required]#
The name of the pipeline.
- field pipeline: PipelineRuntimeConfig | None [Optional]#
Runtime configuration for the pipeline.
- field stages: List[StageConfig] [Required]#
List of all stages in the pipeline.
- Validated by:
- get_phases() Set[PipelinePhase][source]#
Returns a set of all unique phases in the pipeline.
- pydantic model nv_ingest.pipeline.pipeline_schema.PipelineRuntimeConfig[source]#
Bases:
BaseModelConfiguration for pipeline runtime behavior.
- Parameters:
disable_dynamic_scaling (bool) – Whether to disable dynamic scaling of replicas (default: False).
dynamic_memory_threshold (float) – The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.
static_memory_threshold (float) – Global memory threshold for static scaling mode (default: 0.75).
pid_controller (PIDControllerConfig) – PID controller configuration for dynamic scaling.
launch_simple_broker (bool) – If True, launches a simple message broker for the pipeline.
Show JSON schema
{ "title": "PipelineRuntimeConfig", "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n If True, launches a simple message broker for the pipeline.", "type": "object", "properties": { "disable_dynamic_scaling": { "default": false, "description": "Disable dynamic scaling of stage replicas.", "title": "Disable Dynamic Scaling", "type": "boolean" }, "dynamic_memory_threshold": { "default": 0.75, "description": "Memory utilization threshold for dynamic scaling.", "maximum": 0.95, "minimum": 0.0, "title": "Dynamic Memory Threshold", "type": "number" }, "static_memory_threshold": { "default": 0.75, "description": "Global memory threshold for static scaling mode.", "maximum": 1.0, "minimum": 0.0, "title": "Static Memory Threshold", "type": "number" }, "pid_controller": { "$ref": "#/$defs/PIDControllerConfig", "description": "PID controller configuration for dynamic scaling." }, "launch_simple_broker": { "default": false, "description": "Launch a simple message broker for the pipeline.", "title": "Launch Simple Broker", "type": "boolean" } }, "$defs": { "PIDControllerConfig": { "additionalProperties": false, "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "title": "PIDControllerConfig", "type": "object" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- field disable_dynamic_scaling: bool = False#
Disable dynamic scaling of stage replicas.
- field dynamic_memory_threshold: float = 0.75#
Memory utilization threshold for dynamic scaling.
- Constraints:
ge = 0.0
le = 0.95
- field launch_simple_broker: bool = False#
Launch a simple message broker for the pipeline.
- field pid_controller: PIDControllerConfig [Optional]#
PID controller configuration for dynamic scaling.
- field static_memory_threshold: float = 0.75#
Global memory threshold for static scaling mode.
- Constraints:
ge = 0.0
le = 1.0
- class nv_ingest.pipeline.pipeline_schema.ReplicaCalculationStrategy(*values)[source]#
Bases:
str,EnumStrategy for calculating replica counts at runtime.
- CPU_PERCENTAGE = 'cpu_percentage'#
- MEMORY_STATIC_GLOBAL_PERCENT = 'memory_static_global_percent'#
- MEMORY_THRESHOLDING = 'memory_thresholding'#
- STATIC = 'static'#
- pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaConfig[source]#
Bases:
BaseModelConfiguration for stage replicas supporting both dynamic and static scaling modes.
Defines the min/max number of replicas for a stage, either as absolute counts, percentages of total CPU cores, or resource-based calculations. Supports different configurations for dynamic vs static scaling modes.
- cpu_count_min#
Absolute minimum number of replicas. Must be >= 0. (Legacy support)
- Type:
Optional[int]
- cpu_count_max#
Absolute maximum number of replicas. Must be >= 1. (Legacy support)
- Type:
Optional[int]
- cpu_percent_min#
Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)
- Type:
Optional[float]
- cpu_percent_max#
Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)
- Type:
Optional[float]
- min_replicas#
Minimum number of replicas for both scaling modes. Must be >= 0.
- Type:
Optional[int]
- max_replicas#
Maximum replicas for dynamic scaling mode. Can be static int or strategy config.
- Type:
Optional[Union[int, ReplicaStrategyConfig]]
- static_replicas#
Replica configuration for static scaling mode. Can be static int or strategy config.
- Type:
Optional[Union[int, ReplicaStrategyConfig]]
Show JSON schema
{ "title": "ReplicaConfig", "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "type": "object", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "$defs": { "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- Validators:
check_exclusive_min_max»all fields
- field cpu_count_max: int | None = None#
Absolute maximum number of replicas.
- Constraints:
ge = 1
- Validated by:
- field cpu_count_min: int | None = None#
Absolute minimum number of replicas.
- Constraints:
ge = 0
- Validated by:
- field cpu_percent_max: float | None = None#
Maximum number of replicas as a percentage of total cores.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field cpu_percent_min: float | None = None#
Minimum number of replicas as a percentage of total cores.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field max_replicas: int | ReplicaStrategyConfig | None = None#
Maximum replicas for dynamic scaling mode.
- Validated by:
- field min_replicas: int | None = None#
Minimum number of replicas.
- Constraints:
ge = 0
- Validated by:
- field static_replicas: int | ReplicaStrategyConfig | None = None#
Replica configuration for static scaling mode.
- Validated by:
- pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaStrategyConfig[source]#
Bases:
BaseModelConfiguration for a specific replica calculation strategy.
- strategy#
The calculation strategy to use.
- value#
The primary value for the strategy (e.g., static count, CPU percentage).
- Type:
Optional[Union[int, float]]
- limit#
Optional upper limit for calculated replicas.
- Type:
Optional[int]
- cpu_percent#
CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).
- Type:
Optional[float]
- memory_per_replica_mb#
Expected memory usage per replica in MB.
- Type:
Optional[int]
- memory_threshold_percent#
Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).
- Type:
Optional[float]
- max_memory_budget_mb#
Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.
- Type:
Optional[int]
Show JSON schema
{ "title": "ReplicaStrategyConfig", "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "type": "object", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "$defs": { "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" } }, "required": [ "strategy" ] }
- Fields:
- Validators:
validate_strategy_config»all fields
- field cpu_percent: float | None = None#
CPU percentage for CPU_PERCENTAGE strategy.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field limit: int | None = None#
Optional upper limit for calculated replicas.
- Constraints:
ge = 1
- Validated by:
- field max_memory_budget_mb: int | None = None#
Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.
- Constraints:
gt = 0
- Validated by:
- field memory_per_replica_mb: int | None = None#
Expected memory usage per replica in MB.
- Constraints:
gt = 0
- Validated by:
- field memory_threshold_percent: float | None = None#
Memory threshold percentage for MEMORY_THRESHOLDING strategy.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field strategy: ReplicaCalculationStrategy [Required]#
The calculation strategy to use.
- Validated by:
- field value: int | float | None = None#
Primary value for the strategy.
- Validated by:
- pydantic model nv_ingest.pipeline.pipeline_schema.StageConfig[source]#
Bases:
BaseModelConfiguration for a single pipeline stage.
Describes a single component in the ingestion pipeline, including its name, type, actor implementation, and specific configuration.
- name#
A unique name to identify the stage within the pipeline.
- Type:
str
- phase#
The logical phase of the stage in the pipeline.
- Type:
- actor#
The fully qualified import path to the actor class or function that implements the stage’s logic. Mutually exclusive with ‘callable’.
- Type:
Optional[str]
- callable#
The fully qualified import path to a callable function that implements the stage’s logic. Mutually exclusive with ‘actor’.
- Type:
Optional[str]
- task_filters#
List of task types this callable stage should filter for. Only applies to callable stages. Supports both simple strings (e.g., “udf”) and complex filters (e.g., [“udf”, {“phase”: 5}]).
- Type:
Optional[List[Any]]
- enabled#
A flag to indicate whether the stage should be included in the pipeline. If False, the stage and its connected edges are ignored.
- Type:
bool
- config#
A dictionary of configuration parameters passed to the stage’s actor.
- Type:
Dict[str, Any]
- replicas#
The replica configuration for the stage.
- Type:
- runs_after#
A list of stage names that this stage must be downstream of.
- Type:
List[str]
Show JSON schema
{ "title": "StageConfig", "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n A unique name to identify the stage within the pipeline.\ntype : StageType\n The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n The logical phase of the stage in the pipeline.\nactor : Optional[str]\n The fully qualified import path to the actor class or function that\n implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n The fully qualified import path to a callable function that\n implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n List of task types this callable stage should filter for. Only applies to callable stages.\n Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n A flag to indicate whether the stage should be included in the pipeline.\n If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n The replica configuration for the stage.\nruns_after: List[str]\n A list of stage names that this stage must be downstream of.", "type": "object", "properties": { "name": { "description": "Unique name for the stage.", "title": "Name", "type": "string" }, "type": { "$ref": "#/$defs/StageType", "default": "stage", "description": "Type of the stage." }, "phase": { "$ref": "#/$defs/PipelinePhase", "description": "The logical phase of the stage." }, "actor": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to the stage's actor class or function.", "title": "Actor" }, "callable": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to a callable function for the stage.", "title": "Callable" }, "task_filters": { "anyOf": [ { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "description": "List of task types this callable stage should filter for. Only applies to callable stages.", "title": "Task Filters" }, "enabled": { "default": true, "description": "Whether the stage is enabled.", "title": "Enabled", "type": "boolean" }, "config": { "additionalProperties": true, "default": {}, "description": "Configuration dictionary for the stage.", "title": "Config", "type": "object" }, "replicas": { "$ref": "#/$defs/ReplicaConfig", "description": "Replica configuration." }, "runs_after": { "description": "List of stages this stage must run after.", "items": { "type": "string" }, "title": "Runs After", "type": "array" } }, "$defs": { "PipelinePhase": { "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n Pre-processing phase.\nEXTRACTION : int\n Extraction phase.\nPOST_PROCESSING : int\n Post-processing phase.\nMUTATION : int\n Mutation phase.\nTRANSFORM : int\n Transform phase.\nRESPONSE : int\n Response phase.\nTELEMETRY : int\n Telemetry phase.\nDRAIN : int\n Drain phase.", "enum": [ 0, 1, 2, 3, 4, 5, 6, 7 ], "title": "PipelinePhase", "type": "integer" }, "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaConfig": { "additionalProperties": false, "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "title": "ReplicaConfig", "type": "object" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" }, "StageType": { "description": "The type of a pipeline stage.", "enum": [ "source", "stage", "sink" ], "title": "StageType", "type": "string" } }, "additionalProperties": false, "required": [ "name", "phase" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
check_actor_or_callable»all fields
- field actor: str | None = None#
Full import path to the stage’s actor class or function.
- Validated by:
- field callable: str | None = None#
Full import path to a callable function for the stage.
- Validated by:
- field config: Dict[str, Any] = {}#
Configuration dictionary for the stage.
- Validated by:
- field enabled: bool = True#
Whether the stage is enabled.
- Validated by:
- field name: str [Required]#
Unique name for the stage.
- Validated by:
- field phase: PipelinePhase [Required]#
The logical phase of the stage.
- Validated by:
- field replicas: ReplicaConfig [Optional]#
Replica configuration.
- Validated by:
- field runs_after: List[str] [Optional]#
List of stages this stage must run after.
- Validated by:
- field task_filters: List[Any] | None = None#
List of task types this callable stage should filter for. Only applies to callable stages.
- Validated by: