nv_ingest.pipeline package#
Subpackages#
Submodules#
nv_ingest.pipeline.default_libmode_pipeline_impl module#
Default pipeline implementation for libmode.
This module contains the default libmode pipeline configuration as a string, allowing the pipeline to be loaded without requiring external YAML files.
nv_ingest.pipeline.default_pipeline_impl module#
Default pipeline implementation (runtime default).
This module embeds the exact contents of config/default_pipeline.yaml so code can load the default pipeline without reading the YAML file at runtime.
nv_ingest.pipeline.ingest_pipeline module#
nv_ingest.pipeline.pipeline_schema module#
- pydantic model nv_ingest.pipeline.pipeline_schema.EdgeConfig[source]#
Bases:
BaseModelConfiguration for an edge between two stages.
Defines a connection from a source stage to a destination stage, including the size of the intermediate queue.
- from_stage#
The name of the source stage for the edge.
- Type:
str
- to_stage#
The name of the destination stage for the edge.
- Type:
str
- queue_size#
The maximum number of items in the queue between the two stages.
- Type:
int
Show JSON schema
{ "title": "EdgeConfig", "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n The name of the source stage for the edge.\nto_stage : str\n The name of the destination stage for the edge.\nqueue_size : int\n The maximum number of items in the queue between the two stages.", "type": "object", "properties": { "from": { "description": "The name of the source stage.", "title": "From", "type": "string" }, "to": { "description": "The name of the destination stage.", "title": "To", "type": "string" }, "queue_size": { "default": 100, "description": "The size of the queue between stages.", "exclusiveMinimum": 0, "title": "Queue Size", "type": "integer" } }, "additionalProperties": false, "required": [ "from", "to" ] }
- Config:
extra: str = forbid
- Fields:
- field from_stage: str [Required] (alias 'from')#
The name of the source stage.
- field queue_size: int = 100#
The size of the queue between stages.
- Constraints:
gt = 0
- field to_stage: str [Required] (alias 'to')#
The name of the destination stage.
- pydantic model nv_ingest.pipeline.pipeline_schema.PIDControllerConfig[source]#
Bases:
BaseModelConfiguration for the PID controller used in dynamic scaling.
- kp#
Proportional gain for the PID controller.
- Type:
float
- ki#
Integral gain for the PID controller.
- Type:
float
- ema_alpha#
Exponential moving average alpha for the PID controller.
- Type:
float
- target_queue_depth#
Target queue depth for the PID controller.
- Type:
int
- penalty_factor#
Penalty factor for the PID controller.
- Type:
float
- error_boost_factor#
Error boost factor for the PID controller.
- Type:
float
- rcm_memory_safety_buffer_fraction#
Resource constraint manager memory safety buffer fraction.
- Type:
float
Show JSON schema
{ "title": "PIDControllerConfig", "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "type": "object", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- field ema_alpha: float = 0.1#
Exponential moving average alpha for the PID controller.
- Constraints:
ge = 0.0
le = 1.0
- field error_boost_factor: float = 1.5#
Error boost factor for the PID controller.
- Constraints:
gt = 0.0
- field ki: float = 0.01#
Integral gain for the PID controller.
- Constraints:
ge = 0.0
- field kp: float = 0.2#
Proportional gain for the PID controller.
- Constraints:
gt = 0.0
- field penalty_factor: float = 0.1#
Penalty factor for the PID controller.
- Constraints:
ge = 0.0
- field rcm_memory_safety_buffer_fraction: float = 0.15#
Resource constraint manager memory safety buffer fraction.
- Constraints:
ge = 0.0
le = 1.0
- field target_queue_depth: int = 0#
Target queue depth for the PID controller.
- Constraints:
ge = 0
- pydantic model nv_ingest.pipeline.pipeline_schema.PipelineConfigSchema[source]#
Bases:
BaseModelRoot configuration model for an ingestion pipeline.
This model represents the entire declarative configuration for an ingestion pipeline, including all stages and the edges that connect them.
- name#
The name of the pipeline.
- Type:
str
- description#
A description of the pipeline.
- Type:
str
- stages#
A list of all stage configurations in the pipeline.
- Type:
List[StageConfig]
- edges#
A list of all edge configurations that define the pipeline’s topology.
- Type:
List[EdgeConfig]
- pipeline#
description=”Runtime configuration for the pipeline.”)
- Type:
Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,
Show JSON schema
{ "title": "PipelineConfigSchema", "description": "Root configuration model for an ingestion pipeline.\n\nThis model represents the entire declarative configuration for an ingestion\npipeline, including all stages and the edges that connect them.\n\nAttributes\n----------\nname : str\n The name of the pipeline.\ndescription : str\n A description of the pipeline.\nstages : List[StageConfig]\n A list of all stage configurations in the pipeline.\nedges : List[EdgeConfig]\n A list of all edge configurations that define the pipeline's topology.\npipeline: Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,\n description=\"Runtime configuration for the pipeline.\")", "type": "object", "properties": { "name": { "description": "The name of the pipeline.", "title": "Name", "type": "string" }, "description": { "description": "A description of the pipeline.", "title": "Description", "type": "string" }, "stages": { "description": "List of all stages in the pipeline.", "items": { "$ref": "#/$defs/StageConfig" }, "title": "Stages", "type": "array" }, "edges": { "description": "List of all edges connecting the stages.", "items": { "$ref": "#/$defs/EdgeConfig" }, "title": "Edges", "type": "array" }, "pipeline": { "anyOf": [ { "$ref": "#/$defs/PipelineRuntimeConfig" }, { "type": "null" } ], "description": "Runtime configuration for the pipeline." } }, "$defs": { "EdgeConfig": { "additionalProperties": false, "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n The name of the source stage for the edge.\nto_stage : str\n The name of the destination stage for the edge.\nqueue_size : int\n The maximum number of items in the queue between the two stages.", "properties": { "from": { "description": "The name of the source stage.", "title": "From", "type": "string" }, "to": { "description": "The name of the destination stage.", "title": "To", "type": "string" }, "queue_size": { "default": 100, "description": "The size of the queue between stages.", "exclusiveMinimum": 0, "title": "Queue Size", "type": "integer" } }, "required": [ "from", "to" ], "title": "EdgeConfig", "type": "object" }, "PIDControllerConfig": { "additionalProperties": false, "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "title": "PIDControllerConfig", "type": "object" }, "PipelinePhase": { "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n Pre-processing phase.\nEXTRACTION : int\n Extraction phase.\nPOST_PROCESSING : int\n Post-processing phase.\nMUTATION : int\n Mutation phase.\nTRANSFORM : int\n Transform phase.\nRESPONSE : int\n Response phase.\nTELEMETRY : int\n Telemetry phase.\nDRAIN : int\n Drain phase.", "enum": [ 0, 1, 2, 3, 4, 5, 6, 7 ], "title": "PipelinePhase", "type": "integer" }, "PipelineRuntimeConfig": { "additionalProperties": false, "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n If True, launches a simple message broker for the pipeline.", "properties": { "disable_dynamic_scaling": { "default": false, "description": "Disable dynamic scaling of stage replicas.", "title": "Disable Dynamic Scaling", "type": "boolean" }, "dynamic_memory_threshold": { "default": 0.75, "description": "Memory utilization threshold for dynamic scaling.", "maximum": 0.95, "minimum": 0.0, "title": "Dynamic Memory Threshold", "type": "number" }, "static_memory_threshold": { "default": 0.75, "description": "Global memory threshold for static scaling mode.", "maximum": 1.0, "minimum": 0.0, "title": "Static Memory Threshold", "type": "number" }, "pid_controller": { "$ref": "#/$defs/PIDControllerConfig", "description": "PID controller configuration for dynamic scaling." }, "launch_simple_broker": { "default": false, "description": "Launch a simple message broker for the pipeline.", "title": "Launch Simple Broker", "type": "boolean" } }, "title": "PipelineRuntimeConfig", "type": "object" }, "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaConfig": { "additionalProperties": false, "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "title": "ReplicaConfig", "type": "object" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" }, "StageConfig": { "additionalProperties": false, "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n A unique name to identify the stage within the pipeline.\ntype : StageType\n The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n The logical phase of the stage in the pipeline.\nactor : Optional[str]\n The fully qualified import path to the actor class or function that\n implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n The fully qualified import path to a callable function that\n implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n List of task types this callable stage should filter for. Only applies to callable stages.\n Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n A flag to indicate whether the stage should be included in the pipeline.\n If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n The replica configuration for the stage.\nruns_after: List[str]\n A list of stage names that this stage must be downstream of.", "properties": { "name": { "description": "Unique name for the stage.", "title": "Name", "type": "string" }, "type": { "$ref": "#/$defs/StageType", "default": "stage", "description": "Type of the stage." }, "phase": { "$ref": "#/$defs/PipelinePhase", "description": "The logical phase of the stage." }, "actor": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to the stage's actor class or function.", "title": "Actor" }, "callable": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to a callable function for the stage.", "title": "Callable" }, "task_filters": { "anyOf": [ { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "description": "List of task types this callable stage should filter for. Only applies to callable stages.", "title": "Task Filters" }, "enabled": { "default": true, "description": "Whether the stage is enabled.", "title": "Enabled", "type": "boolean" }, "config": { "default": {}, "description": "Configuration dictionary for the stage.", "title": "Config", "type": "object" }, "replicas": { "$ref": "#/$defs/ReplicaConfig", "description": "Replica configuration." }, "runs_after": { "description": "List of stages this stage must run after.", "items": { "type": "string" }, "title": "Runs After", "type": "array" } }, "required": [ "name", "phase" ], "title": "StageConfig", "type": "object" }, "StageType": { "description": "The type of a pipeline stage.", "enum": [ "source", "stage", "sink" ], "title": "StageType", "type": "string" } }, "additionalProperties": false, "required": [ "name", "description", "stages", "edges" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
- field description: str [Required]#
A description of the pipeline.
- field edges: List[EdgeConfig] [Required]#
List of all edges connecting the stages.
- Validated by:
- field name: str [Required]#
The name of the pipeline.
- field pipeline: PipelineRuntimeConfig | None [Optional]#
Runtime configuration for the pipeline.
- field stages: List[StageConfig] [Required]#
List of all stages in the pipeline.
- Validated by:
- get_phases() Set[PipelinePhase][source]#
Returns a set of all unique phases in the pipeline.
- pydantic model nv_ingest.pipeline.pipeline_schema.PipelineRuntimeConfig[source]#
Bases:
BaseModelConfiguration for pipeline runtime behavior.
- Parameters:
disable_dynamic_scaling (bool) – Whether to disable dynamic scaling of replicas (default: False).
dynamic_memory_threshold (float) – The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.
static_memory_threshold (float) – Global memory threshold for static scaling mode (default: 0.75).
pid_controller (PIDControllerConfig) – PID controller configuration for dynamic scaling.
launch_simple_broker (bool) – If True, launches a simple message broker for the pipeline.
Show JSON schema
{ "title": "PipelineRuntimeConfig", "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n If True, launches a simple message broker for the pipeline.", "type": "object", "properties": { "disable_dynamic_scaling": { "default": false, "description": "Disable dynamic scaling of stage replicas.", "title": "Disable Dynamic Scaling", "type": "boolean" }, "dynamic_memory_threshold": { "default": 0.75, "description": "Memory utilization threshold for dynamic scaling.", "maximum": 0.95, "minimum": 0.0, "title": "Dynamic Memory Threshold", "type": "number" }, "static_memory_threshold": { "default": 0.75, "description": "Global memory threshold for static scaling mode.", "maximum": 1.0, "minimum": 0.0, "title": "Static Memory Threshold", "type": "number" }, "pid_controller": { "$ref": "#/$defs/PIDControllerConfig", "description": "PID controller configuration for dynamic scaling." }, "launch_simple_broker": { "default": false, "description": "Launch a simple message broker for the pipeline.", "title": "Launch Simple Broker", "type": "boolean" } }, "$defs": { "PIDControllerConfig": { "additionalProperties": false, "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n Proportional gain for the PID controller.\nki : float\n Integral gain for the PID controller.\nema_alpha : float\n Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n Target queue depth for the PID controller.\npenalty_factor : float\n Penalty factor for the PID controller.\nerror_boost_factor : float\n Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n Resource constraint manager memory safety buffer fraction.", "properties": { "kp": { "default": 0.2, "description": "Proportional gain for the PID controller.", "exclusiveMinimum": 0.0, "title": "Kp", "type": "number" }, "ki": { "default": 0.01, "description": "Integral gain for the PID controller.", "minimum": 0.0, "title": "Ki", "type": "number" }, "ema_alpha": { "default": 0.1, "description": "Exponential moving average alpha for the PID controller.", "maximum": 1.0, "minimum": 0.0, "title": "Ema Alpha", "type": "number" }, "target_queue_depth": { "default": 0, "description": "Target queue depth for the PID controller.", "minimum": 0, "title": "Target Queue Depth", "type": "integer" }, "penalty_factor": { "default": 0.1, "description": "Penalty factor for the PID controller.", "minimum": 0.0, "title": "Penalty Factor", "type": "number" }, "error_boost_factor": { "default": 1.5, "description": "Error boost factor for the PID controller.", "exclusiveMinimum": 0.0, "title": "Error Boost Factor", "type": "number" }, "rcm_memory_safety_buffer_fraction": { "default": 0.15, "description": "Resource constraint manager memory safety buffer fraction.", "maximum": 1.0, "minimum": 0.0, "title": "Rcm Memory Safety Buffer Fraction", "type": "number" } }, "title": "PIDControllerConfig", "type": "object" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- field disable_dynamic_scaling: bool = False#
Disable dynamic scaling of stage replicas.
- field dynamic_memory_threshold: float = 0.75#
Memory utilization threshold for dynamic scaling.
- Constraints:
ge = 0.0
le = 0.95
- field launch_simple_broker: bool = False#
Launch a simple message broker for the pipeline.
- field pid_controller: PIDControllerConfig [Optional]#
PID controller configuration for dynamic scaling.
- field static_memory_threshold: float = 0.75#
Global memory threshold for static scaling mode.
- Constraints:
ge = 0.0
le = 1.0
- class nv_ingest.pipeline.pipeline_schema.ReplicaCalculationStrategy(value)[source]#
Bases:
str,EnumStrategy for calculating replica counts at runtime.
- CPU_PERCENTAGE = 'cpu_percentage'#
- MEMORY_STATIC_GLOBAL_PERCENT = 'memory_static_global_percent'#
- MEMORY_THRESHOLDING = 'memory_thresholding'#
- STATIC = 'static'#
- pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaConfig[source]#
Bases:
BaseModelConfiguration for stage replicas supporting both dynamic and static scaling modes.
Defines the min/max number of replicas for a stage, either as absolute counts, percentages of total CPU cores, or resource-based calculations. Supports different configurations for dynamic vs static scaling modes.
- cpu_count_min#
Absolute minimum number of replicas. Must be >= 0. (Legacy support)
- Type:
Optional[int]
- cpu_count_max#
Absolute maximum number of replicas. Must be >= 1. (Legacy support)
- Type:
Optional[int]
- cpu_percent_min#
Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)
- Type:
Optional[float]
- cpu_percent_max#
Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)
- Type:
Optional[float]
- min_replicas#
Minimum number of replicas for both scaling modes. Must be >= 0.
- Type:
Optional[int]
- max_replicas#
Maximum replicas for dynamic scaling mode. Can be static int or strategy config.
- Type:
Optional[Union[int, ReplicaStrategyConfig]]
- static_replicas#
Replica configuration for static scaling mode. Can be static int or strategy config.
- Type:
Optional[Union[int, ReplicaStrategyConfig]]
Show JSON schema
{ "title": "ReplicaConfig", "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "type": "object", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "$defs": { "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" } }, "additionalProperties": false }
- Config:
extra: str = forbid
- Fields:
- Validators:
check_exclusive_min_max»all fields
- field cpu_count_max: int | None = None#
Absolute maximum number of replicas.
- Constraints:
ge = 1
- Validated by:
- field cpu_count_min: int | None = None#
Absolute minimum number of replicas.
- Constraints:
ge = 0
- Validated by:
- field cpu_percent_max: float | None = None#
Maximum number of replicas as a percentage of total cores.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field cpu_percent_min: float | None = None#
Minimum number of replicas as a percentage of total cores.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field max_replicas: int | ReplicaStrategyConfig | None = None#
Maximum replicas for dynamic scaling mode.
- Validated by:
- field min_replicas: int | None = None#
Minimum number of replicas.
- Constraints:
ge = 0
- Validated by:
- field static_replicas: int | ReplicaStrategyConfig | None = None#
Replica configuration for static scaling mode.
- Validated by:
- pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaStrategyConfig[source]#
Bases:
BaseModelConfiguration for a specific replica calculation strategy.
- strategy#
The calculation strategy to use.
- value#
The primary value for the strategy (e.g., static count, CPU percentage).
- Type:
Optional[Union[int, float]]
- limit#
Optional upper limit for calculated replicas.
- Type:
Optional[int]
- cpu_percent#
CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).
- Type:
Optional[float]
- memory_per_replica_mb#
Expected memory usage per replica in MB.
- Type:
Optional[int]
- memory_threshold_percent#
Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).
- Type:
Optional[float]
- max_memory_budget_mb#
Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.
- Type:
Optional[int]
Show JSON schema
{ "title": "ReplicaStrategyConfig", "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "type": "object", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "$defs": { "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" } }, "required": [ "strategy" ] }
- Fields:
- Validators:
validate_strategy_config»all fields
- field cpu_percent: float | None = None#
CPU percentage for CPU_PERCENTAGE strategy.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field limit: int | None = None#
Optional upper limit for calculated replicas.
- Constraints:
ge = 1
- Validated by:
- field max_memory_budget_mb: int | None = None#
Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.
- Constraints:
gt = 0
- Validated by:
- field memory_per_replica_mb: int | None = None#
Expected memory usage per replica in MB.
- Constraints:
gt = 0
- Validated by:
- field memory_threshold_percent: float | None = None#
Memory threshold percentage for MEMORY_THRESHOLDING strategy.
- Constraints:
ge = 0.0
le = 1.0
- Validated by:
- field strategy: ReplicaCalculationStrategy [Required]#
The calculation strategy to use.
- Validated by:
- field value: int | float | None = None#
Primary value for the strategy.
- Validated by:
- pydantic model nv_ingest.pipeline.pipeline_schema.StageConfig[source]#
Bases:
BaseModelConfiguration for a single pipeline stage.
Describes a single component in the ingestion pipeline, including its name, type, actor implementation, and specific configuration.
- name#
A unique name to identify the stage within the pipeline.
- Type:
str
- phase#
The logical phase of the stage in the pipeline.
- Type:
- actor#
The fully qualified import path to the actor class or function that implements the stage’s logic. Mutually exclusive with ‘callable’.
- Type:
Optional[str]
- callable#
The fully qualified import path to a callable function that implements the stage’s logic. Mutually exclusive with ‘actor’.
- Type:
Optional[str]
- task_filters#
List of task types this callable stage should filter for. Only applies to callable stages. Supports both simple strings (e.g., “udf”) and complex filters (e.g., [“udf”, {“phase”: 5}]).
- Type:
Optional[List[Any]]
- enabled#
A flag to indicate whether the stage should be included in the pipeline. If False, the stage and its connected edges are ignored.
- Type:
bool
- config#
A dictionary of configuration parameters passed to the stage’s actor.
- Type:
Dict[str, Any]
- replicas#
The replica configuration for the stage.
- Type:
- runs_after#
A list of stage names that this stage must be downstream of.
- Type:
List[str]
Show JSON schema
{ "title": "StageConfig", "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n A unique name to identify the stage within the pipeline.\ntype : StageType\n The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n The logical phase of the stage in the pipeline.\nactor : Optional[str]\n The fully qualified import path to the actor class or function that\n implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n The fully qualified import path to a callable function that\n implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n List of task types this callable stage should filter for. Only applies to callable stages.\n Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n A flag to indicate whether the stage should be included in the pipeline.\n If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n The replica configuration for the stage.\nruns_after: List[str]\n A list of stage names that this stage must be downstream of.", "type": "object", "properties": { "name": { "description": "Unique name for the stage.", "title": "Name", "type": "string" }, "type": { "$ref": "#/$defs/StageType", "default": "stage", "description": "Type of the stage." }, "phase": { "$ref": "#/$defs/PipelinePhase", "description": "The logical phase of the stage." }, "actor": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to the stage's actor class or function.", "title": "Actor" }, "callable": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "description": "Full import path to a callable function for the stage.", "title": "Callable" }, "task_filters": { "anyOf": [ { "items": {}, "type": "array" }, { "type": "null" } ], "default": null, "description": "List of task types this callable stage should filter for. Only applies to callable stages.", "title": "Task Filters" }, "enabled": { "default": true, "description": "Whether the stage is enabled.", "title": "Enabled", "type": "boolean" }, "config": { "default": {}, "description": "Configuration dictionary for the stage.", "title": "Config", "type": "object" }, "replicas": { "$ref": "#/$defs/ReplicaConfig", "description": "Replica configuration." }, "runs_after": { "description": "List of stages this stage must run after.", "items": { "type": "string" }, "title": "Runs After", "type": "array" } }, "$defs": { "PipelinePhase": { "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n Pre-processing phase.\nEXTRACTION : int\n Extraction phase.\nPOST_PROCESSING : int\n Post-processing phase.\nMUTATION : int\n Mutation phase.\nTRANSFORM : int\n Transform phase.\nRESPONSE : int\n Response phase.\nTELEMETRY : int\n Telemetry phase.\nDRAIN : int\n Drain phase.", "enum": [ 0, 1, 2, 3, 4, 5, 6, 7 ], "title": "PipelinePhase", "type": "integer" }, "ReplicaCalculationStrategy": { "description": "Strategy for calculating replica counts at runtime.", "enum": [ "static", "cpu_percentage", "memory_thresholding", "memory_static_global_percent" ], "title": "ReplicaCalculationStrategy", "type": "string" }, "ReplicaConfig": { "additionalProperties": false, "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n Replica configuration for static scaling mode. Can be static int or strategy config.", "properties": { "cpu_count_min": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute minimum number of replicas.", "title": "Cpu Count Min" }, "cpu_count_max": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Absolute maximum number of replicas.", "title": "Cpu Count Max" }, "cpu_percent_min": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas as a percentage of total cores.", "title": "Cpu Percent Min" }, "cpu_percent_max": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Maximum number of replicas as a percentage of total cores.", "title": "Cpu Percent Max" }, "min_replicas": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Minimum number of replicas.", "title": "Min Replicas" }, "max_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Maximum replicas for dynamic scaling mode.", "title": "Max Replicas" }, "static_replicas": { "anyOf": [ { "type": "integer" }, { "$ref": "#/$defs/ReplicaStrategyConfig" }, { "type": "null" } ], "default": null, "description": "Replica configuration for static scaling mode.", "title": "Static Replicas" } }, "title": "ReplicaConfig", "type": "object" }, "ReplicaStrategyConfig": { "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "properties": { "strategy": { "$ref": "#/$defs/ReplicaCalculationStrategy", "description": "The calculation strategy to use." }, "value": { "anyOf": [ { "type": "integer" }, { "type": "number" }, { "type": "null" } ], "default": null, "description": "Primary value for the strategy.", "title": "Value" }, "limit": { "anyOf": [ { "minimum": 1, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Optional upper limit for calculated replicas.", "title": "Limit" }, "cpu_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "CPU percentage for CPU_PERCENTAGE strategy.", "title": "Cpu Percent" }, "memory_per_replica_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Expected memory usage per replica in MB.", "title": "Memory Per Replica Mb" }, "memory_threshold_percent": { "anyOf": [ { "maximum": 1.0, "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.", "title": "Memory Threshold Percent" }, "max_memory_budget_mb": { "anyOf": [ { "exclusiveMinimum": 0, "type": "integer" }, { "type": "null" } ], "default": null, "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.", "title": "Max Memory Budget Mb" } }, "required": [ "strategy" ], "title": "ReplicaStrategyConfig", "type": "object" }, "StageType": { "description": "The type of a pipeline stage.", "enum": [ "source", "stage", "sink" ], "title": "StageType", "type": "string" } }, "additionalProperties": false, "required": [ "name", "phase" ] }
- Config:
extra: str = forbid
- Fields:
- Validators:
check_actor_or_callable»all fields
- field actor: str | None = None#
Full import path to the stage’s actor class or function.
- Validated by:
- field callable: str | None = None#
Full import path to a callable function for the stage.
- Validated by:
- field config: Dict[str, Any] = {}#
Configuration dictionary for the stage.
- Validated by:
- field enabled: bool = True#
Whether the stage is enabled.
- Validated by:
- field name: str [Required]#
Unique name for the stage.
- Validated by:
- field phase: PipelinePhase [Required]#
The logical phase of the stage.
- Validated by:
- field replicas: ReplicaConfig [Optional]#
Replica configuration.
- Validated by:
- field runs_after: List[str] [Optional]#
List of stages this stage must run after.
- Validated by:
- field task_filters: List[Any] | None = None#
List of task types this callable stage should filter for. Only applies to callable stages.
- Validated by: