nv_ingest.pipeline package#

Subpackages#

Submodules#

nv_ingest.pipeline.default_libmode_pipeline_impl module#

Default pipeline implementation for libmode.

This module contains the default libmode pipeline configuration as a string, allowing the pipeline to be loaded without requiring external YAML files.

nv_ingest.pipeline.default_pipeline_impl module#

Default pipeline implementation (runtime default).

This module embeds the exact contents of config/default_pipeline.yaml so code can load the default pipeline without reading the YAML file at runtime.

nv_ingest.pipeline.ingest_pipeline module#

nv_ingest.pipeline.pipeline_schema module#

pydantic model nv_ingest.pipeline.pipeline_schema.EdgeConfig[source]#

Bases: BaseModel

Configuration for an edge between two stages.

Defines a connection from a source stage to a destination stage, including the size of the intermediate queue.

from_stage#

The name of the source stage for the edge.

Type:

str

to_stage#

The name of the destination stage for the edge.

Type:

str

queue_size#

The maximum number of items in the queue between the two stages.

Type:

int

Show JSON schema
{
   "title": "EdgeConfig",
   "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n    The name of the source stage for the edge.\nto_stage : str\n    The name of the destination stage for the edge.\nqueue_size : int\n    The maximum number of items in the queue between the two stages.",
   "type": "object",
   "properties": {
      "from": {
         "description": "The name of the source stage.",
         "title": "From",
         "type": "string"
      },
      "to": {
         "description": "The name of the destination stage.",
         "title": "To",
         "type": "string"
      },
      "queue_size": {
         "default": 100,
         "description": "The size of the queue between stages.",
         "exclusiveMinimum": 0,
         "title": "Queue Size",
         "type": "integer"
      }
   },
   "additionalProperties": false,
   "required": [
      "from",
      "to"
   ]
}

Config:
  • extra: str = forbid

Fields:
field from_stage: str [Required] (alias 'from')#

The name of the source stage.

field queue_size: int = 100#

The size of the queue between stages.

Constraints:
  • gt = 0

field to_stage: str [Required] (alias 'to')#

The name of the destination stage.

pydantic model nv_ingest.pipeline.pipeline_schema.PIDControllerConfig[source]#

Bases: BaseModel

Configuration for the PID controller used in dynamic scaling.

kp#

Proportional gain for the PID controller.

Type:

float

ki#

Integral gain for the PID controller.

Type:

float

ema_alpha#

Exponential moving average alpha for the PID controller.

Type:

float

target_queue_depth#

Target queue depth for the PID controller.

Type:

int

penalty_factor#

Penalty factor for the PID controller.

Type:

float

error_boost_factor#

Error boost factor for the PID controller.

Type:

float

rcm_memory_safety_buffer_fraction#

Resource constraint manager memory safety buffer fraction.

Type:

float

Show JSON schema
{
   "title": "PIDControllerConfig",
   "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n    Proportional gain for the PID controller.\nki : float\n    Integral gain for the PID controller.\nema_alpha : float\n    Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n    Target queue depth for the PID controller.\npenalty_factor : float\n    Penalty factor for the PID controller.\nerror_boost_factor : float\n    Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n    Resource constraint manager memory safety buffer fraction.",
   "type": "object",
   "properties": {
      "kp": {
         "default": 0.2,
         "description": "Proportional gain for the PID controller.",
         "exclusiveMinimum": 0.0,
         "title": "Kp",
         "type": "number"
      },
      "ki": {
         "default": 0.01,
         "description": "Integral gain for the PID controller.",
         "minimum": 0.0,
         "title": "Ki",
         "type": "number"
      },
      "ema_alpha": {
         "default": 0.1,
         "description": "Exponential moving average alpha for the PID controller.",
         "maximum": 1.0,
         "minimum": 0.0,
         "title": "Ema Alpha",
         "type": "number"
      },
      "target_queue_depth": {
         "default": 0,
         "description": "Target queue depth for the PID controller.",
         "minimum": 0,
         "title": "Target Queue Depth",
         "type": "integer"
      },
      "penalty_factor": {
         "default": 0.1,
         "description": "Penalty factor for the PID controller.",
         "minimum": 0.0,
         "title": "Penalty Factor",
         "type": "number"
      },
      "error_boost_factor": {
         "default": 1.5,
         "description": "Error boost factor for the PID controller.",
         "exclusiveMinimum": 0.0,
         "title": "Error Boost Factor",
         "type": "number"
      },
      "rcm_memory_safety_buffer_fraction": {
         "default": 0.15,
         "description": "Resource constraint manager memory safety buffer fraction.",
         "maximum": 1.0,
         "minimum": 0.0,
         "title": "Rcm Memory Safety Buffer Fraction",
         "type": "number"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
field ema_alpha: float = 0.1#

Exponential moving average alpha for the PID controller.

Constraints:
  • ge = 0.0

  • le = 1.0

field error_boost_factor: float = 1.5#

Error boost factor for the PID controller.

Constraints:
  • gt = 0.0

field ki: float = 0.01#

Integral gain for the PID controller.

Constraints:
  • ge = 0.0

field kp: float = 0.2#

Proportional gain for the PID controller.

Constraints:
  • gt = 0.0

field penalty_factor: float = 0.1#

Penalty factor for the PID controller.

Constraints:
  • ge = 0.0

field rcm_memory_safety_buffer_fraction: float = 0.15#

Resource constraint manager memory safety buffer fraction.

Constraints:
  • ge = 0.0

  • le = 1.0

field target_queue_depth: int = 0#

Target queue depth for the PID controller.

Constraints:
  • ge = 0

pydantic model nv_ingest.pipeline.pipeline_schema.PipelineConfigSchema[source]#

Bases: BaseModel

Root configuration model for an ingestion pipeline.

This model represents the entire declarative configuration for an ingestion pipeline, including all stages and the edges that connect them.

name#

The name of the pipeline.

Type:

str

description#

A description of the pipeline.

Type:

str

stages#

A list of all stage configurations in the pipeline.

Type:

List[StageConfig]

edges#

A list of all edge configurations that define the pipeline’s topology.

Type:

List[EdgeConfig]

pipeline#

description=”Runtime configuration for the pipeline.”)

Type:

Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,

Show JSON schema
{
   "title": "PipelineConfigSchema",
   "description": "Root configuration model for an ingestion pipeline.\n\nThis model represents the entire declarative configuration for an ingestion\npipeline, including all stages and the edges that connect them.\n\nAttributes\n----------\nname : str\n    The name of the pipeline.\ndescription : str\n    A description of the pipeline.\nstages : List[StageConfig]\n    A list of all stage configurations in the pipeline.\nedges : List[EdgeConfig]\n    A list of all edge configurations that define the pipeline's topology.\npipeline: Optional[PipelineRuntimeConfig] = Field(default_factory=PipelineRuntimeConfig,\n    description=\"Runtime configuration for the pipeline.\")",
   "type": "object",
   "properties": {
      "name": {
         "description": "The name of the pipeline.",
         "title": "Name",
         "type": "string"
      },
      "description": {
         "description": "A description of the pipeline.",
         "title": "Description",
         "type": "string"
      },
      "stages": {
         "description": "List of all stages in the pipeline.",
         "items": {
            "$ref": "#/$defs/StageConfig"
         },
         "title": "Stages",
         "type": "array"
      },
      "edges": {
         "description": "List of all edges connecting the stages.",
         "items": {
            "$ref": "#/$defs/EdgeConfig"
         },
         "title": "Edges",
         "type": "array"
      },
      "pipeline": {
         "anyOf": [
            {
               "$ref": "#/$defs/PipelineRuntimeConfig"
            },
            {
               "type": "null"
            }
         ],
         "description": "Runtime configuration for the pipeline."
      }
   },
   "$defs": {
      "EdgeConfig": {
         "additionalProperties": false,
         "description": "Configuration for an edge between two stages.\n\nDefines a connection from a source stage to a destination stage, including\nthe size of the intermediate queue.\n\nAttributes\n----------\nfrom_stage : str\n    The name of the source stage for the edge.\nto_stage : str\n    The name of the destination stage for the edge.\nqueue_size : int\n    The maximum number of items in the queue between the two stages.",
         "properties": {
            "from": {
               "description": "The name of the source stage.",
               "title": "From",
               "type": "string"
            },
            "to": {
               "description": "The name of the destination stage.",
               "title": "To",
               "type": "string"
            },
            "queue_size": {
               "default": 100,
               "description": "The size of the queue between stages.",
               "exclusiveMinimum": 0,
               "title": "Queue Size",
               "type": "integer"
            }
         },
         "required": [
            "from",
            "to"
         ],
         "title": "EdgeConfig",
         "type": "object"
      },
      "PIDControllerConfig": {
         "additionalProperties": false,
         "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n    Proportional gain for the PID controller.\nki : float\n    Integral gain for the PID controller.\nema_alpha : float\n    Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n    Target queue depth for the PID controller.\npenalty_factor : float\n    Penalty factor for the PID controller.\nerror_boost_factor : float\n    Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n    Resource constraint manager memory safety buffer fraction.",
         "properties": {
            "kp": {
               "default": 0.2,
               "description": "Proportional gain for the PID controller.",
               "exclusiveMinimum": 0.0,
               "title": "Kp",
               "type": "number"
            },
            "ki": {
               "default": 0.01,
               "description": "Integral gain for the PID controller.",
               "minimum": 0.0,
               "title": "Ki",
               "type": "number"
            },
            "ema_alpha": {
               "default": 0.1,
               "description": "Exponential moving average alpha for the PID controller.",
               "maximum": 1.0,
               "minimum": 0.0,
               "title": "Ema Alpha",
               "type": "number"
            },
            "target_queue_depth": {
               "default": 0,
               "description": "Target queue depth for the PID controller.",
               "minimum": 0,
               "title": "Target Queue Depth",
               "type": "integer"
            },
            "penalty_factor": {
               "default": 0.1,
               "description": "Penalty factor for the PID controller.",
               "minimum": 0.0,
               "title": "Penalty Factor",
               "type": "number"
            },
            "error_boost_factor": {
               "default": 1.5,
               "description": "Error boost factor for the PID controller.",
               "exclusiveMinimum": 0.0,
               "title": "Error Boost Factor",
               "type": "number"
            },
            "rcm_memory_safety_buffer_fraction": {
               "default": 0.15,
               "description": "Resource constraint manager memory safety buffer fraction.",
               "maximum": 1.0,
               "minimum": 0.0,
               "title": "Rcm Memory Safety Buffer Fraction",
               "type": "number"
            }
         },
         "title": "PIDControllerConfig",
         "type": "object"
      },
      "PipelinePhase": {
         "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n    Pre-processing phase.\nEXTRACTION : int\n    Extraction phase.\nPOST_PROCESSING : int\n    Post-processing phase.\nMUTATION : int\n    Mutation phase.\nTRANSFORM : int\n    Transform phase.\nRESPONSE : int\n    Response phase.\nTELEMETRY : int\n    Telemetry phase.\nDRAIN : int\n    Drain phase.",
         "enum": [
            0,
            1,
            2,
            3,
            4,
            5,
            6,
            7
         ],
         "title": "PipelinePhase",
         "type": "integer"
      },
      "PipelineRuntimeConfig": {
         "additionalProperties": false,
         "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n    Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n    The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n    Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n    PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n    If True, launches a simple message broker for the pipeline.",
         "properties": {
            "disable_dynamic_scaling": {
               "default": false,
               "description": "Disable dynamic scaling of stage replicas.",
               "title": "Disable Dynamic Scaling",
               "type": "boolean"
            },
            "dynamic_memory_threshold": {
               "default": 0.75,
               "description": "Memory utilization threshold for dynamic scaling.",
               "maximum": 0.95,
               "minimum": 0.0,
               "title": "Dynamic Memory Threshold",
               "type": "number"
            },
            "static_memory_threshold": {
               "default": 0.75,
               "description": "Global memory threshold for static scaling mode.",
               "maximum": 1.0,
               "minimum": 0.0,
               "title": "Static Memory Threshold",
               "type": "number"
            },
            "pid_controller": {
               "$ref": "#/$defs/PIDControllerConfig",
               "description": "PID controller configuration for dynamic scaling."
            },
            "launch_simple_broker": {
               "default": false,
               "description": "Launch a simple message broker for the pipeline.",
               "title": "Launch Simple Broker",
               "type": "boolean"
            }
         },
         "title": "PipelineRuntimeConfig",
         "type": "object"
      },
      "ReplicaCalculationStrategy": {
         "description": "Strategy for calculating replica counts at runtime.",
         "enum": [
            "static",
            "cpu_percentage",
            "memory_thresholding",
            "memory_static_global_percent"
         ],
         "title": "ReplicaCalculationStrategy",
         "type": "string"
      },
      "ReplicaConfig": {
         "additionalProperties": false,
         "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n    Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n    Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n    Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n    Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n    Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Replica configuration for static scaling mode. Can be static int or strategy config.",
         "properties": {
            "cpu_count_min": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Absolute minimum number of replicas.",
               "title": "Cpu Count Min"
            },
            "cpu_count_max": {
               "anyOf": [
                  {
                     "minimum": 1,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Absolute maximum number of replicas.",
               "title": "Cpu Count Max"
            },
            "cpu_percent_min": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Minimum number of replicas as a percentage of total cores.",
               "title": "Cpu Percent Min"
            },
            "cpu_percent_max": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum number of replicas as a percentage of total cores.",
               "title": "Cpu Percent Max"
            },
            "min_replicas": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Minimum number of replicas.",
               "title": "Min Replicas"
            },
            "max_replicas": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "$ref": "#/$defs/ReplicaStrategyConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum replicas for dynamic scaling mode.",
               "title": "Max Replicas"
            },
            "static_replicas": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "$ref": "#/$defs/ReplicaStrategyConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Replica configuration for static scaling mode.",
               "title": "Static Replicas"
            }
         },
         "title": "ReplicaConfig",
         "type": "object"
      },
      "ReplicaStrategyConfig": {
         "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n    The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n    The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n    Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n    CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n    Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n    Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n    Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
         "properties": {
            "strategy": {
               "$ref": "#/$defs/ReplicaCalculationStrategy",
               "description": "The calculation strategy to use."
            },
            "value": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Primary value for the strategy.",
               "title": "Value"
            },
            "limit": {
               "anyOf": [
                  {
                     "minimum": 1,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Optional upper limit for calculated replicas.",
               "title": "Limit"
            },
            "cpu_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "CPU percentage for CPU_PERCENTAGE strategy.",
               "title": "Cpu Percent"
            },
            "memory_per_replica_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Expected memory usage per replica in MB.",
               "title": "Memory Per Replica Mb"
            },
            "memory_threshold_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.",
               "title": "Memory Threshold Percent"
            },
            "max_memory_budget_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
               "title": "Max Memory Budget Mb"
            }
         },
         "required": [
            "strategy"
         ],
         "title": "ReplicaStrategyConfig",
         "type": "object"
      },
      "StageConfig": {
         "additionalProperties": false,
         "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n    A unique name to identify the stage within the pipeline.\ntype : StageType\n    The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n    The logical phase of the stage in the pipeline.\nactor : Optional[str]\n    The fully qualified import path to the actor class or function that\n    implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n    The fully qualified import path to a callable function that\n    implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n    List of task types this callable stage should filter for. Only applies to callable stages.\n    Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n    A flag to indicate whether the stage should be included in the pipeline.\n    If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n    A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n    The replica configuration for the stage.\nruns_after: List[str]\n    A list of stage names that this stage must be downstream of.",
         "properties": {
            "name": {
               "description": "Unique name for the stage.",
               "title": "Name",
               "type": "string"
            },
            "type": {
               "$ref": "#/$defs/StageType",
               "default": "stage",
               "description": "Type of the stage."
            },
            "phase": {
               "$ref": "#/$defs/PipelinePhase",
               "description": "The logical phase of the stage."
            },
            "actor": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Full import path to the stage's actor class or function.",
               "title": "Actor"
            },
            "callable": {
               "anyOf": [
                  {
                     "type": "string"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Full import path to a callable function for the stage.",
               "title": "Callable"
            },
            "task_filters": {
               "anyOf": [
                  {
                     "items": {},
                     "type": "array"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "List of task types this callable stage should filter for. Only applies to callable stages.",
               "title": "Task Filters"
            },
            "enabled": {
               "default": true,
               "description": "Whether the stage is enabled.",
               "title": "Enabled",
               "type": "boolean"
            },
            "config": {
               "default": {},
               "description": "Configuration dictionary for the stage.",
               "title": "Config",
               "type": "object"
            },
            "replicas": {
               "$ref": "#/$defs/ReplicaConfig",
               "description": "Replica configuration."
            },
            "runs_after": {
               "description": "List of stages this stage must run after.",
               "items": {
                  "type": "string"
               },
               "title": "Runs After",
               "type": "array"
            }
         },
         "required": [
            "name",
            "phase"
         ],
         "title": "StageConfig",
         "type": "object"
      },
      "StageType": {
         "description": "The type of a pipeline stage.",
         "enum": [
            "source",
            "stage",
            "sink"
         ],
         "title": "StageType",
         "type": "string"
      }
   },
   "additionalProperties": false,
   "required": [
      "name",
      "description",
      "stages",
      "edges"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:
field description: str [Required]#

A description of the pipeline.

field edges: List[EdgeConfig] [Required]#

List of all edges connecting the stages.

Validated by:
field name: str [Required]#

The name of the pipeline.

field pipeline: PipelineRuntimeConfig | None [Optional]#

Runtime configuration for the pipeline.

field stages: List[StageConfig] [Required]#

List of all stages in the pipeline.

Validated by:
validator check_not_empty  »  stages, edges[source]#

Validates that the list is not empty.

get_phases() Set[PipelinePhase][source]#

Returns a set of all unique phases in the pipeline.

pydantic model nv_ingest.pipeline.pipeline_schema.PipelineRuntimeConfig[source]#

Bases: BaseModel

Configuration for pipeline runtime behavior.

Parameters:
  • disable_dynamic_scaling (bool) – Whether to disable dynamic scaling of replicas (default: False).

  • dynamic_memory_threshold (float) – The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.

  • static_memory_threshold (float) – Global memory threshold for static scaling mode (default: 0.75).

  • pid_controller (PIDControllerConfig) – PID controller configuration for dynamic scaling.

  • launch_simple_broker (bool) – If True, launches a simple message broker for the pipeline.

Show JSON schema
{
   "title": "PipelineRuntimeConfig",
   "description": "Configuration for pipeline runtime behavior.\n\nParameters\n----------\ndisable_dynamic_scaling : bool\n    Whether to disable dynamic scaling of replicas (default: False).\ndynamic_memory_threshold : float\n    The memory utilization threshold (0.0 to 1.0) for dynamic scaling decisions.\nstatic_memory_threshold : float\n    Global memory threshold for static scaling mode (default: 0.75).\npid_controller : PIDControllerConfig\n    PID controller configuration for dynamic scaling.\nlaunch_simple_broker : bool\n    If True, launches a simple message broker for the pipeline.",
   "type": "object",
   "properties": {
      "disable_dynamic_scaling": {
         "default": false,
         "description": "Disable dynamic scaling of stage replicas.",
         "title": "Disable Dynamic Scaling",
         "type": "boolean"
      },
      "dynamic_memory_threshold": {
         "default": 0.75,
         "description": "Memory utilization threshold for dynamic scaling.",
         "maximum": 0.95,
         "minimum": 0.0,
         "title": "Dynamic Memory Threshold",
         "type": "number"
      },
      "static_memory_threshold": {
         "default": 0.75,
         "description": "Global memory threshold for static scaling mode.",
         "maximum": 1.0,
         "minimum": 0.0,
         "title": "Static Memory Threshold",
         "type": "number"
      },
      "pid_controller": {
         "$ref": "#/$defs/PIDControllerConfig",
         "description": "PID controller configuration for dynamic scaling."
      },
      "launch_simple_broker": {
         "default": false,
         "description": "Launch a simple message broker for the pipeline.",
         "title": "Launch Simple Broker",
         "type": "boolean"
      }
   },
   "$defs": {
      "PIDControllerConfig": {
         "additionalProperties": false,
         "description": "Configuration for the PID controller used in dynamic scaling.\n\nAttributes\n----------\nkp : float\n    Proportional gain for the PID controller.\nki : float\n    Integral gain for the PID controller.\nema_alpha : float\n    Exponential moving average alpha for the PID controller.\ntarget_queue_depth : int\n    Target queue depth for the PID controller.\npenalty_factor : float\n    Penalty factor for the PID controller.\nerror_boost_factor : float\n    Error boost factor for the PID controller.\nrcm_memory_safety_buffer_fraction : float\n    Resource constraint manager memory safety buffer fraction.",
         "properties": {
            "kp": {
               "default": 0.2,
               "description": "Proportional gain for the PID controller.",
               "exclusiveMinimum": 0.0,
               "title": "Kp",
               "type": "number"
            },
            "ki": {
               "default": 0.01,
               "description": "Integral gain for the PID controller.",
               "minimum": 0.0,
               "title": "Ki",
               "type": "number"
            },
            "ema_alpha": {
               "default": 0.1,
               "description": "Exponential moving average alpha for the PID controller.",
               "maximum": 1.0,
               "minimum": 0.0,
               "title": "Ema Alpha",
               "type": "number"
            },
            "target_queue_depth": {
               "default": 0,
               "description": "Target queue depth for the PID controller.",
               "minimum": 0,
               "title": "Target Queue Depth",
               "type": "integer"
            },
            "penalty_factor": {
               "default": 0.1,
               "description": "Penalty factor for the PID controller.",
               "minimum": 0.0,
               "title": "Penalty Factor",
               "type": "number"
            },
            "error_boost_factor": {
               "default": 1.5,
               "description": "Error boost factor for the PID controller.",
               "exclusiveMinimum": 0.0,
               "title": "Error Boost Factor",
               "type": "number"
            },
            "rcm_memory_safety_buffer_fraction": {
               "default": 0.15,
               "description": "Resource constraint manager memory safety buffer fraction.",
               "maximum": 1.0,
               "minimum": 0.0,
               "title": "Rcm Memory Safety Buffer Fraction",
               "type": "number"
            }
         },
         "title": "PIDControllerConfig",
         "type": "object"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
field disable_dynamic_scaling: bool = False#

Disable dynamic scaling of stage replicas.

field dynamic_memory_threshold: float = 0.75#

Memory utilization threshold for dynamic scaling.

Constraints:
  • ge = 0.0

  • le = 0.95

field launch_simple_broker: bool = False#

Launch a simple message broker for the pipeline.

field pid_controller: PIDControllerConfig [Optional]#

PID controller configuration for dynamic scaling.

field static_memory_threshold: float = 0.75#

Global memory threshold for static scaling mode.

Constraints:
  • ge = 0.0

  • le = 1.0

class nv_ingest.pipeline.pipeline_schema.ReplicaCalculationStrategy(value)[source]#

Bases: str, Enum

Strategy for calculating replica counts at runtime.

CPU_PERCENTAGE = 'cpu_percentage'#
MEMORY_STATIC_GLOBAL_PERCENT = 'memory_static_global_percent'#
MEMORY_THRESHOLDING = 'memory_thresholding'#
STATIC = 'static'#
pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaConfig[source]#

Bases: BaseModel

Configuration for stage replicas supporting both dynamic and static scaling modes.

Defines the min/max number of replicas for a stage, either as absolute counts, percentages of total CPU cores, or resource-based calculations. Supports different configurations for dynamic vs static scaling modes.

cpu_count_min#

Absolute minimum number of replicas. Must be >= 0. (Legacy support)

Type:

Optional[int]

cpu_count_max#

Absolute maximum number of replicas. Must be >= 1. (Legacy support)

Type:

Optional[int]

cpu_percent_min#

Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)

Type:

Optional[float]

cpu_percent_max#

Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)

Type:

Optional[float]

min_replicas#

Minimum number of replicas for both scaling modes. Must be >= 0.

Type:

Optional[int]

max_replicas#

Maximum replicas for dynamic scaling mode. Can be static int or strategy config.

Type:

Optional[Union[int, ReplicaStrategyConfig]]

static_replicas#

Replica configuration for static scaling mode. Can be static int or strategy config.

Type:

Optional[Union[int, ReplicaStrategyConfig]]

Show JSON schema
{
   "title": "ReplicaConfig",
   "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n    Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n    Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n    Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n    Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n    Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Replica configuration for static scaling mode. Can be static int or strategy config.",
   "type": "object",
   "properties": {
      "cpu_count_min": {
         "anyOf": [
            {
               "minimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Absolute minimum number of replicas.",
         "title": "Cpu Count Min"
      },
      "cpu_count_max": {
         "anyOf": [
            {
               "minimum": 1,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Absolute maximum number of replicas.",
         "title": "Cpu Count Max"
      },
      "cpu_percent_min": {
         "anyOf": [
            {
               "maximum": 1.0,
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Minimum number of replicas as a percentage of total cores.",
         "title": "Cpu Percent Min"
      },
      "cpu_percent_max": {
         "anyOf": [
            {
               "maximum": 1.0,
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Maximum number of replicas as a percentage of total cores.",
         "title": "Cpu Percent Max"
      },
      "min_replicas": {
         "anyOf": [
            {
               "minimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Minimum number of replicas.",
         "title": "Min Replicas"
      },
      "max_replicas": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "$ref": "#/$defs/ReplicaStrategyConfig"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Maximum replicas for dynamic scaling mode.",
         "title": "Max Replicas"
      },
      "static_replicas": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "$ref": "#/$defs/ReplicaStrategyConfig"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Replica configuration for static scaling mode.",
         "title": "Static Replicas"
      }
   },
   "$defs": {
      "ReplicaCalculationStrategy": {
         "description": "Strategy for calculating replica counts at runtime.",
         "enum": [
            "static",
            "cpu_percentage",
            "memory_thresholding",
            "memory_static_global_percent"
         ],
         "title": "ReplicaCalculationStrategy",
         "type": "string"
      },
      "ReplicaStrategyConfig": {
         "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n    The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n    The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n    Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n    CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n    Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n    Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n    Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
         "properties": {
            "strategy": {
               "$ref": "#/$defs/ReplicaCalculationStrategy",
               "description": "The calculation strategy to use."
            },
            "value": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Primary value for the strategy.",
               "title": "Value"
            },
            "limit": {
               "anyOf": [
                  {
                     "minimum": 1,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Optional upper limit for calculated replicas.",
               "title": "Limit"
            },
            "cpu_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "CPU percentage for CPU_PERCENTAGE strategy.",
               "title": "Cpu Percent"
            },
            "memory_per_replica_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Expected memory usage per replica in MB.",
               "title": "Memory Per Replica Mb"
            },
            "memory_threshold_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.",
               "title": "Memory Threshold Percent"
            },
            "max_memory_budget_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
               "title": "Max Memory Budget Mb"
            }
         },
         "required": [
            "strategy"
         ],
         "title": "ReplicaStrategyConfig",
         "type": "object"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
Validators:
field cpu_count_max: int | None = None#

Absolute maximum number of replicas.

Constraints:
  • ge = 1

Validated by:
field cpu_count_min: int | None = None#

Absolute minimum number of replicas.

Constraints:
  • ge = 0

Validated by:
field cpu_percent_max: float | None = None#

Maximum number of replicas as a percentage of total cores.

Constraints:
  • ge = 0.0

  • le = 1.0

Validated by:
field cpu_percent_min: float | None = None#

Minimum number of replicas as a percentage of total cores.

Constraints:
  • ge = 0.0

  • le = 1.0

Validated by:
field max_replicas: int | ReplicaStrategyConfig | None = None#

Maximum replicas for dynamic scaling mode.

Validated by:
field min_replicas: int | None = None#

Minimum number of replicas.

Constraints:
  • ge = 0

Validated by:
field static_replicas: int | ReplicaStrategyConfig | None = None#

Replica configuration for static scaling mode.

Validated by:
validator check_exclusive_min_max  »  all fields[source]#

Validates that replica configuration is consistent and complete.

Ensures that: 1. Legacy fields (cpu_count_*, cpu_percent_*) are not mixed with new fields 2. At least one configuration method is specified 3. Min/max relationships are valid

pydantic model nv_ingest.pipeline.pipeline_schema.ReplicaStrategyConfig[source]#

Bases: BaseModel

Configuration for a specific replica calculation strategy.

strategy#

The calculation strategy to use.

Type:

ReplicaCalculationStrategy

value#

The primary value for the strategy (e.g., static count, CPU percentage).

Type:

Optional[Union[int, float]]

limit#

Optional upper limit for calculated replicas.

Type:

Optional[int]

cpu_percent#

CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).

Type:

Optional[float]

memory_per_replica_mb#

Expected memory usage per replica in MB.

Type:

Optional[int]

memory_threshold_percent#

Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).

Type:

Optional[float]

max_memory_budget_mb#

Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.

Type:

Optional[int]

Show JSON schema
{
   "title": "ReplicaStrategyConfig",
   "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n    The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n    The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n    Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n    CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n    Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n    Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n    Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
   "type": "object",
   "properties": {
      "strategy": {
         "$ref": "#/$defs/ReplicaCalculationStrategy",
         "description": "The calculation strategy to use."
      },
      "value": {
         "anyOf": [
            {
               "type": "integer"
            },
            {
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Primary value for the strategy.",
         "title": "Value"
      },
      "limit": {
         "anyOf": [
            {
               "minimum": 1,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Optional upper limit for calculated replicas.",
         "title": "Limit"
      },
      "cpu_percent": {
         "anyOf": [
            {
               "maximum": 1.0,
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "CPU percentage for CPU_PERCENTAGE strategy.",
         "title": "Cpu Percent"
      },
      "memory_per_replica_mb": {
         "anyOf": [
            {
               "exclusiveMinimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Expected memory usage per replica in MB.",
         "title": "Memory Per Replica Mb"
      },
      "memory_threshold_percent": {
         "anyOf": [
            {
               "maximum": 1.0,
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.",
         "title": "Memory Threshold Percent"
      },
      "max_memory_budget_mb": {
         "anyOf": [
            {
               "exclusiveMinimum": 0,
               "type": "integer"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
         "title": "Max Memory Budget Mb"
      }
   },
   "$defs": {
      "ReplicaCalculationStrategy": {
         "description": "Strategy for calculating replica counts at runtime.",
         "enum": [
            "static",
            "cpu_percentage",
            "memory_thresholding",
            "memory_static_global_percent"
         ],
         "title": "ReplicaCalculationStrategy",
         "type": "string"
      }
   },
   "required": [
      "strategy"
   ]
}

Fields:
Validators:
field cpu_percent: float | None = None#

CPU percentage for CPU_PERCENTAGE strategy.

Constraints:
  • ge = 0.0

  • le = 1.0

Validated by:
field limit: int | None = None#

Optional upper limit for calculated replicas.

Constraints:
  • ge = 1

Validated by:
field max_memory_budget_mb: int | None = None#

Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.

Constraints:
  • gt = 0

Validated by:
field memory_per_replica_mb: int | None = None#

Expected memory usage per replica in MB.

Constraints:
  • gt = 0

Validated by:
field memory_threshold_percent: float | None = None#

Memory threshold percentage for MEMORY_THRESHOLDING strategy.

Constraints:
  • ge = 0.0

  • le = 1.0

Validated by:
field strategy: ReplicaCalculationStrategy [Required]#

The calculation strategy to use.

Validated by:
field value: int | float | None = None#

Primary value for the strategy.

Validated by:
validator validate_strategy_config  »  all fields[source]#

Validate that required fields are present for each strategy.

pydantic model nv_ingest.pipeline.pipeline_schema.StageConfig[source]#

Bases: BaseModel

Configuration for a single pipeline stage.

Describes a single component in the ingestion pipeline, including its name, type, actor implementation, and specific configuration.

name#

A unique name to identify the stage within the pipeline.

Type:

str

type#

The type of the stage, which determines how it’s added to the RayPipeline.

Type:

StageType

phase#

The logical phase of the stage in the pipeline.

Type:

PipelinePhase

actor#

The fully qualified import path to the actor class or function that implements the stage’s logic. Mutually exclusive with ‘callable’.

Type:

Optional[str]

callable#

The fully qualified import path to a callable function that implements the stage’s logic. Mutually exclusive with ‘actor’.

Type:

Optional[str]

task_filters#

List of task types this callable stage should filter for. Only applies to callable stages. Supports both simple strings (e.g., “udf”) and complex filters (e.g., [“udf”, {“phase”: 5}]).

Type:

Optional[List[Any]]

enabled#

A flag to indicate whether the stage should be included in the pipeline. If False, the stage and its connected edges are ignored.

Type:

bool

config#

A dictionary of configuration parameters passed to the stage’s actor.

Type:

Dict[str, Any]

replicas#

The replica configuration for the stage.

Type:

ReplicaConfig

runs_after#

A list of stage names that this stage must be downstream of.

Type:

List[str]

Show JSON schema
{
   "title": "StageConfig",
   "description": "Configuration for a single pipeline stage.\n\nDescribes a single component in the ingestion pipeline, including its name,\ntype, actor implementation, and specific configuration.\n\nAttributes\n----------\nname : str\n    A unique name to identify the stage within the pipeline.\ntype : StageType\n    The type of the stage, which determines how it's added to the RayPipeline.\nphase: PipelinePhase\n    The logical phase of the stage in the pipeline.\nactor : Optional[str]\n    The fully qualified import path to the actor class or function that\n    implements the stage's logic. Mutually exclusive with 'callable'.\ncallable : Optional[str]\n    The fully qualified import path to a callable function that\n    implements the stage's logic. Mutually exclusive with 'actor'.\ntask_filters: Optional[List[Any]]\n    List of task types this callable stage should filter for. Only applies to callable stages.\n    Supports both simple strings (e.g., \"udf\") and complex filters (e.g., [\"udf\", {\"phase\": 5}]).\nenabled : bool\n    A flag to indicate whether the stage should be included in the pipeline.\n    If False, the stage and its connected edges are ignored.\nconfig : Dict[str, Any]\n    A dictionary of configuration parameters passed to the stage's actor.\nreplicas : ReplicaConfig\n    The replica configuration for the stage.\nruns_after: List[str]\n    A list of stage names that this stage must be downstream of.",
   "type": "object",
   "properties": {
      "name": {
         "description": "Unique name for the stage.",
         "title": "Name",
         "type": "string"
      },
      "type": {
         "$ref": "#/$defs/StageType",
         "default": "stage",
         "description": "Type of the stage."
      },
      "phase": {
         "$ref": "#/$defs/PipelinePhase",
         "description": "The logical phase of the stage."
      },
      "actor": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Full import path to the stage's actor class or function.",
         "title": "Actor"
      },
      "callable": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "Full import path to a callable function for the stage.",
         "title": "Callable"
      },
      "task_filters": {
         "anyOf": [
            {
               "items": {},
               "type": "array"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "description": "List of task types this callable stage should filter for. Only applies to callable stages.",
         "title": "Task Filters"
      },
      "enabled": {
         "default": true,
         "description": "Whether the stage is enabled.",
         "title": "Enabled",
         "type": "boolean"
      },
      "config": {
         "default": {},
         "description": "Configuration dictionary for the stage.",
         "title": "Config",
         "type": "object"
      },
      "replicas": {
         "$ref": "#/$defs/ReplicaConfig",
         "description": "Replica configuration."
      },
      "runs_after": {
         "description": "List of stages this stage must run after.",
         "items": {
            "type": "string"
         },
         "title": "Runs After",
         "type": "array"
      }
   },
   "$defs": {
      "PipelinePhase": {
         "description": "The logical phase of a pipeline stage.\n\nAttributes\n----------\nPRE_PROCESSING : int\n    Pre-processing phase.\nEXTRACTION : int\n    Extraction phase.\nPOST_PROCESSING : int\n    Post-processing phase.\nMUTATION : int\n    Mutation phase.\nTRANSFORM : int\n    Transform phase.\nRESPONSE : int\n    Response phase.\nTELEMETRY : int\n    Telemetry phase.\nDRAIN : int\n    Drain phase.",
         "enum": [
            0,
            1,
            2,
            3,
            4,
            5,
            6,
            7
         ],
         "title": "PipelinePhase",
         "type": "integer"
      },
      "ReplicaCalculationStrategy": {
         "description": "Strategy for calculating replica counts at runtime.",
         "enum": [
            "static",
            "cpu_percentage",
            "memory_thresholding",
            "memory_static_global_percent"
         ],
         "title": "ReplicaCalculationStrategy",
         "type": "string"
      },
      "ReplicaConfig": {
         "additionalProperties": false,
         "description": "Configuration for stage replicas supporting both dynamic and static scaling modes.\n\nDefines the min/max number of replicas for a stage, either as absolute counts,\npercentages of total CPU cores, or resource-based calculations. Supports different\nconfigurations for dynamic vs static scaling modes.\n\nAttributes\n----------\ncpu_count_min : Optional[int]\n    Absolute minimum number of replicas. Must be >= 0. (Legacy support)\ncpu_count_max : Optional[int]\n    Absolute maximum number of replicas. Must be >= 1. (Legacy support)\ncpu_percent_min : Optional[float]\n    Minimum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\ncpu_percent_max : Optional[float]\n    Maximum number of replicas as a percentage (0.0 to 1.0) of total cores. (Legacy support)\nmin_replicas : Optional[int]\n    Minimum number of replicas for both scaling modes. Must be >= 0.\nmax_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Maximum replicas for dynamic scaling mode. Can be static int or strategy config.\nstatic_replicas : Optional[Union[int, ReplicaStrategyConfig]]\n    Replica configuration for static scaling mode. Can be static int or strategy config.",
         "properties": {
            "cpu_count_min": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Absolute minimum number of replicas.",
               "title": "Cpu Count Min"
            },
            "cpu_count_max": {
               "anyOf": [
                  {
                     "minimum": 1,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Absolute maximum number of replicas.",
               "title": "Cpu Count Max"
            },
            "cpu_percent_min": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Minimum number of replicas as a percentage of total cores.",
               "title": "Cpu Percent Min"
            },
            "cpu_percent_max": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum number of replicas as a percentage of total cores.",
               "title": "Cpu Percent Max"
            },
            "min_replicas": {
               "anyOf": [
                  {
                     "minimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Minimum number of replicas.",
               "title": "Min Replicas"
            },
            "max_replicas": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "$ref": "#/$defs/ReplicaStrategyConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum replicas for dynamic scaling mode.",
               "title": "Max Replicas"
            },
            "static_replicas": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "$ref": "#/$defs/ReplicaStrategyConfig"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Replica configuration for static scaling mode.",
               "title": "Static Replicas"
            }
         },
         "title": "ReplicaConfig",
         "type": "object"
      },
      "ReplicaStrategyConfig": {
         "description": "Configuration for a specific replica calculation strategy.\n\nAttributes\n----------\nstrategy : ReplicaCalculationStrategy\n    The calculation strategy to use.\nvalue : Optional[Union[int, float]]\n    The primary value for the strategy (e.g., static count, CPU percentage).\nlimit : Optional[int]\n    Optional upper limit for calculated replicas.\ncpu_percent : Optional[float]\n    CPU percentage for CPU_PERCENTAGE strategy (0.0 to 1.0).\nmemory_per_replica_mb : Optional[int]\n    Expected memory usage per replica in MB.\nmemory_threshold_percent : Optional[float]\n    Memory threshold percentage for MEMORY_THRESHOLDING strategy (0.0 to 1.0).\nmax_memory_budget_mb : Optional[int]\n    Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
         "properties": {
            "strategy": {
               "$ref": "#/$defs/ReplicaCalculationStrategy",
               "description": "The calculation strategy to use."
            },
            "value": {
               "anyOf": [
                  {
                     "type": "integer"
                  },
                  {
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Primary value for the strategy.",
               "title": "Value"
            },
            "limit": {
               "anyOf": [
                  {
                     "minimum": 1,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Optional upper limit for calculated replicas.",
               "title": "Limit"
            },
            "cpu_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "CPU percentage for CPU_PERCENTAGE strategy.",
               "title": "Cpu Percent"
            },
            "memory_per_replica_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Expected memory usage per replica in MB.",
               "title": "Memory Per Replica Mb"
            },
            "memory_threshold_percent": {
               "anyOf": [
                  {
                     "maximum": 1.0,
                     "minimum": 0.0,
                     "type": "number"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Memory threshold percentage for MEMORY_THRESHOLDING strategy.",
               "title": "Memory Threshold Percent"
            },
            "max_memory_budget_mb": {
               "anyOf": [
                  {
                     "exclusiveMinimum": 0,
                     "type": "integer"
                  },
                  {
                     "type": "null"
                  }
               ],
               "default": null,
               "description": "Maximum memory budget for MEMORY_STATIC_GLOBAL_PERCENT strategy in MB.",
               "title": "Max Memory Budget Mb"
            }
         },
         "required": [
            "strategy"
         ],
         "title": "ReplicaStrategyConfig",
         "type": "object"
      },
      "StageType": {
         "description": "The type of a pipeline stage.",
         "enum": [
            "source",
            "stage",
            "sink"
         ],
         "title": "StageType",
         "type": "string"
      }
   },
   "additionalProperties": false,
   "required": [
      "name",
      "phase"
   ]
}

Config:
  • extra: str = forbid

Fields:
Validators:
field actor: str | None = None#

Full import path to the stage’s actor class or function.

Validated by:
field callable: str | None = None#

Full import path to a callable function for the stage.

Validated by:
field config: Dict[str, Any] = {}#

Configuration dictionary for the stage.

Validated by:
field enabled: bool = True#

Whether the stage is enabled.

Validated by:
field name: str [Required]#

Unique name for the stage.

Validated by:
field phase: PipelinePhase [Required]#

The logical phase of the stage.

Validated by:
field replicas: ReplicaConfig [Optional]#

Replica configuration.

Validated by:
field runs_after: List[str] [Optional]#

List of stages this stage must run after.

Validated by:
field task_filters: List[Any] | None = None#

List of task types this callable stage should filter for. Only applies to callable stages.

Validated by:
field type: StageType = StageType.STAGE#

Type of the stage.

Validated by:
validator check_actor_or_callable  »  all fields[source]#

Validates that exactly one of ‘actor’ or ‘callable’ is specified.

class nv_ingest.pipeline.pipeline_schema.StageType(value)[source]#

Bases: str, Enum

The type of a pipeline stage.

SINK = 'sink'#
SOURCE = 'source'#
STAGE = 'stage'#

Module contents#