Source code for nemo_retriever.utils.ray_resource_hueristics

# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import logging
import os
from typing import Optional

from pydantic import BaseModel, ConfigDict

logger = logging.getLogger(__name__)

# Global constants for controlling Ray resource hueristic calculations when
# the user does not specify a requested override.

# EMBEDDING Actor constants (PER-GPU)
EMBED_INITIAL_ACTORS = 2  # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up.  # noqa: E501
EMBED_MIN_ACTORS = 1  # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number.  # noqa: E501
EMBED_MAX_ACTORS = 4  # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available.  # noqa: E501
EMBED_GPUS_PER_ACTOR = (
    0.5  # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on.
)
EMBED_BATCH_SIZE = 256  # Ray batch size AND EMBEDDING inference batch size

# Nemotron Parse Actor constants (PER-GPU)
NEMOTRON_PARSE_INITIAL_ACTORS = 1  # vLLM manages batching internally; one actor is sufficient.  # noqa: E501
NEMOTRON_PARSE_MIN_ACTORS = 1  # vLLM manages batching internally; one actor is sufficient.  # noqa: E501
NEMOTRON_PARSE_MAX_ACTORS = 1  # vLLM manages batching internally; one actor is sufficient.  # noqa: E501
VLLM_GPUS_PER_ACTOR = 1.0  # vLLM owns the full GPU for KV-cache management and continuous batching.  # noqa: E501
NEMOTRON_PARSE_BATCH_SIZE = 64  # Ray batch size AND Nemotron Parse inference batch size

# OCR Actor constants (PER-GPU)
OCR_INITIAL_ACTORS = 3  # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up.  # noqa: E501
OCR_MIN_ACTORS = 1  # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number.  # noqa: E501
OCR_MAX_ACTORS = 10  # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available.  # noqa: E501
OCR_GPUS_PER_ACTOR = 0.1  # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on.
OCR_BATCH_SIZE = 32  # Ray batch size AND OCR inference batch size

# PAGE-ELEMENTS Actor constants (PER-GPU)
PAGE_ELEMENTS_INITIAL_ACTORS = 3  # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up.  # noqa: E501
PAGE_ELEMENTS_MIN_ACTORS = 1  # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number.  # noqa: E501
PAGE_ELEMENTS_MAX_ACTORS = 10  # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available.  # noqa: E501
PAGE_ELEMENTS_GPUS_PER_ACTOR = (
    0.1  # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on. # noqa: E501
)
PAGE_ELEMENTS_BATCH_SIZE = 32  # Ray batch size AND PAGE-ELEMENTS inference batch size

# TABLE-STRUCTURE Actor constants – mirrors page-elements since NIMs are similar weight
TABLE_STRUCTURE_INITIAL_ACTORS = 2
TABLE_STRUCTURE_MIN_ACTORS = 1
TABLE_STRUCTURE_MAX_ACTORS = 6
TABLE_STRUCTURE_GPUS_PER_ACTOR = 0.1
TABLE_STRUCTURE_BATCH_SIZE = 16

# GRAPHIC-ELEMENTS Actor constants – mirrors page-elements since NIMs are similar weight
GRAPHIC_ELEMENTS_INITIAL_ACTORS = 2
GRAPHIC_ELEMENTS_MIN_ACTORS = 1
GRAPHIC_ELEMENTS_MAX_ACTORS = 6
GRAPHIC_ELEMENTS_GPUS_PER_ACTOR = 0.1
GRAPHIC_ELEMENTS_BATCH_SIZE = 16

# PDF EXTRACTOR constants (PER-GPU) - reason being more GPU means more CPU needed to feed the models and keep up
PDF_EXTRACT_BATCH_SIZE = 8  # Ray batch size AND PDF extraction batch size
PDF_EXTRACT_CPUS_PER_TASK = (
    2.0  # Hueristic baseline num CPUs per task. Used to determine which CPU to schedule the task on.
)
PDF_EXTRACT_TASKS = 16  # Hueristic baseline num tasks. Used to determine how many CPU tasks to run in parallel.


[docs] class GpuInfo(BaseModel): driver_version: str gpu_name: str gpu_uuid: str gpu_brand: str total_mib: int used_mib: int free_mib: int
[docs] class NodeGpuInfo(BaseModel): gpus: dict[int, GpuInfo]
def _get_gpu_memory_info() -> NodeGpuInfo: """Get the memory information for each GPU.""" from pynvml import ( nvmlInit, nvmlSystemGetDriverVersion, nvmlDeviceGetCount, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, nvmlShutdown, nvmlDeviceGetName, nvmlDeviceGetUUID, nvmlDeviceGetBrand, ) nvmlInit() driver_version = nvmlSystemGetDriverVersion() device_count = nvmlDeviceGetCount() gpu_info: dict[int, GpuInfo] = {} for i in range(device_count): handle = nvmlDeviceGetHandleByIndex(i) info = nvmlDeviceGetMemoryInfo(handle) gpu_name = nvmlDeviceGetName(handle) gpu_uuid = nvmlDeviceGetUUID(handle) gpu_info[i] = GpuInfo( driver_version=( driver_version.decode("utf-8", errors="replace") if isinstance(driver_version, (bytes, bytearray)) else str(driver_version) ), gpu_name=( gpu_name.decode("utf-8", errors="replace") if isinstance(gpu_name, (bytes, bytearray)) else str(gpu_name) ), gpu_uuid=( gpu_uuid.decode("utf-8", errors="replace") if isinstance(gpu_uuid, (bytes, bytearray)) else str(gpu_uuid) ), gpu_brand=str(nvmlDeviceGetBrand(handle)), total_mib=int(info.total // (1024**2)), used_mib=int(info.used // (1024**2)), free_mib=int(info.free // (1024**2)), ) nvmlShutdown() return NodeGpuInfo(gpus=gpu_info)
[docs] def get_gpu_memory_info_remote() -> object: """Return a Ray ObjectRef for ``_get_gpu_memory_info`` executed remotely.""" import ray return ray.remote(_get_gpu_memory_info).remote()
[docs] class Resources(BaseModel): """Resources and where they came from.""" model_config = ConfigDict(frozen=True) cpu_count: int gpu_count: int def __str__(self) -> str: return f"Resources(cpu_count={self.cpu_count}, gpu_count={self.gpu_count})" def __repr__(self) -> str: return self.__str__() def __hash__(self) -> int: return hash((self.cpu_count, self.gpu_count)) def __eq__(self, other: object) -> bool: if not isinstance(other, Resources): return False return self.cpu_count == other.cpu_count and self.gpu_count == other.gpu_count def __ne__(self, other: object) -> bool: return not self.__eq__(other)
[docs] class ClusterResources(BaseModel): """Detected compute resources and where they came from.""" model_config = ConfigDict(frozen=True) total_resources: Resources # Total resources available to the cluster available_resources: Resources # Available resources to the cluster (not in use currently)
[docs] def total_cpu_count(self) -> int: return self.total_resources.cpu_count
[docs] def total_gpu_count(self) -> int: return self.total_resources.gpu_count
[docs] def available_cpu_count(self) -> int: return self.available_resources.cpu_count
[docs] def available_gpu_count(self) -> int: return self.available_resources.gpu_count
def __str__(self) -> str: return ( f"ClusterResources(total_resources={self.total_resources}, available_resources={self.available_resources})" ) def __repr__(self) -> str: return self.__str__() def __hash__(self) -> int: return hash((self.total_resources, self.available_resources)) def __eq__(self, other: object) -> bool: if not isinstance(other, ClusterResources): return False return self.total_resources == other.total_resources and self.available_resources == other.available_resources def __ne__(self, other: object) -> bool: return not self.__eq__(other)
[docs] def gather_cluster_resources(ray: object) -> ClusterResources: """Gather total and available CPU/GPU resources from a Ray cluster.""" if not ray.is_initialized(): raise ValueError("Ray is not initialized") def _coerce_count(value: object) -> int: try: parsed = float(value) # Ray may report fractional available resources. except (TypeError, ValueError): return 0 if parsed <= 0: return 0 return int(parsed) total_resources: dict[str, object] = ray.cluster_resources() available_resources: dict[str, object] = ray.available_resources() return ClusterResources( total_resources=Resources( cpu_count=_coerce_count(total_resources.get("CPU", 0)), gpu_count=_coerce_count(total_resources.get("GPU", 0)), ), available_resources=Resources( cpu_count=_coerce_count(available_resources.get("CPU", 0)), gpu_count=_coerce_count(available_resources.get("GPU", 0)), ), )
def _detect_local_gpu_count() -> int: """Best-effort local GPU count without a hard pynvml dep. Tried in order: pynvml (rich detail when available), torch.cuda (already in the core deps), then ``CUDA_VISIBLE_DEVICES`` (purely env-based, used by the skill-eval harness to pin runs to a single GPU). Returns 0 only when every probe fails or reports no GPUs — never raises. Historical bug this guards against: pynvml is an optional dep; when missing, GPU count came back 0, which silently flipped the rerank ArchetypeOperator to its CPU variant. That variant auto-fills the build.nvidia.com endpoint, so a caller asking for a local GPU reranker silently turned into a remote HTTP request — and a 401 / cost spike when the call eventually failed. """ try: return int(len(_get_gpu_memory_info().gpus)) except Exception as exc: # pynvml missing, NVML runtime unavailable, etc. logger.debug("pynvml GPU detection failed (%s); trying torch.cuda", exc) try: import torch if torch.cuda.is_available(): return int(torch.cuda.device_count()) except Exception as exc: logger.debug("torch.cuda GPU detection failed (%s); trying CUDA_VISIBLE_DEVICES", exc) visible = os.environ.get("CUDA_VISIBLE_DEVICES") if visible is not None: # ``""`` is a valid "no GPUs visible" signal; ``"0,1"`` etc means N visible. ids = [tok for tok in visible.split(",") if tok.strip()] return len(ids) return 0
[docs] def gather_local_resources() -> Resources: """Gather local CPU/GPU resources without requiring Ray.""" cpu_count = int(os.cpu_count() or 0) return Resources(cpu_count=cpu_count, gpu_count=_detect_local_gpu_count())
[docs] class RequestedPlan(BaseModel): """Contains the requested Ray DAG plan for the batch ingest.""" model_config = ConfigDict(frozen=True) # Embedder resources requested to satisfy DAG plan embed_initial_actors: int embed_min_actors: int embed_max_actors: int embed_gpus_per_actor: float embed_batch_size: int # Nemotron Parse resources requested to satisfy DAG plan nemotron_parse_initial_actors: int nemotron_parse_min_actors: int nemotron_parse_max_actors: int nemotron_parse_gpus_per_actor: float nemotron_parse_batch_size: int # OCR resources requested to satisfy DAG plan ocr_initial_actors: int ocr_min_actors: int ocr_max_actors: int ocr_gpus_per_actor: float ocr_batch_size: int # Page Elements resources requested to satisfy DAG plan page_elements_initial_actors: int page_elements_min_actors: int page_elements_max_actors: int page_elements_gpus_per_actor: float page_elements_batch_size: int # Table Structure resources requested to satisfy DAG plan table_structure_initial_actors: int table_structure_min_actors: int table_structure_max_actors: int table_structure_gpus_per_actor: float table_structure_batch_size: int # Graphic Elements resources requested to satisfy DAG plan graphic_elements_initial_actors: int graphic_elements_min_actors: int graphic_elements_max_actors: int graphic_elements_gpus_per_actor: float graphic_elements_batch_size: int # Caption resources requested to satisfy DAG plan caption_gpus_per_actor: float # PDF Extraction resources requested to satisfy DAG plan pdf_extract_batch_size: int pdf_extract_cpus_per_task: float pdf_extract_tasks: int
[docs] def get_embed_initial_actors(self) -> int: return self.embed_initial_actors
[docs] def get_embed_min_actors(self) -> int: return self.embed_min_actors
[docs] def get_embed_max_actors(self) -> int: return self.embed_max_actors
[docs] def get_embed_gpus_per_actor(self) -> float: return self.embed_gpus_per_actor
[docs] def get_embed_batch_size(self) -> int: return self.embed_batch_size
[docs] def get_nemotron_parse_initial_actors(self) -> int: return self.nemotron_parse_initial_actors
[docs] def get_nemotron_parse_min_actors(self) -> int: return self.nemotron_parse_min_actors
[docs] def get_nemotron_parse_max_actors(self) -> int: return self.nemotron_parse_max_actors
[docs] def get_nemotron_parse_gpus_per_actor(self) -> float: return self.nemotron_parse_gpus_per_actor
[docs] def get_nemotron_parse_batch_size(self) -> int: return self.nemotron_parse_batch_size
[docs] def get_ocr_initial_actors(self) -> int: return self.ocr_initial_actors
[docs] def get_ocr_min_actors(self) -> int: return self.ocr_min_actors
[docs] def get_ocr_max_actors(self) -> int: return self.ocr_max_actors
[docs] def get_ocr_gpus_per_actor(self) -> float: return self.ocr_gpus_per_actor
[docs] def get_ocr_batch_size(self) -> int: return self.ocr_batch_size
[docs] def get_page_elements_initial_actors(self) -> int: return self.page_elements_initial_actors
[docs] def get_page_elements_min_actors(self) -> int: return self.page_elements_min_actors
[docs] def get_page_elements_max_actors(self) -> int: return self.page_elements_max_actors
[docs] def get_page_elements_gpus_per_actor(self) -> float: return self.page_elements_gpus_per_actor
[docs] def get_page_elements_batch_size(self) -> int: return self.page_elements_batch_size
[docs] def get_table_structure_initial_actors(self) -> int: return self.table_structure_initial_actors
[docs] def get_table_structure_min_actors(self) -> int: return self.table_structure_min_actors
[docs] def get_table_structure_max_actors(self) -> int: return self.table_structure_max_actors
[docs] def get_table_structure_gpus_per_actor(self) -> float: return self.table_structure_gpus_per_actor
[docs] def get_table_structure_batch_size(self) -> int: return self.table_structure_batch_size
[docs] def get_graphic_elements_initial_actors(self) -> int: return self.graphic_elements_initial_actors
[docs] def get_graphic_elements_min_actors(self) -> int: return self.graphic_elements_min_actors
[docs] def get_graphic_elements_max_actors(self) -> int: return self.graphic_elements_max_actors
[docs] def get_graphic_elements_gpus_per_actor(self) -> float: return self.graphic_elements_gpus_per_actor
[docs] def get_graphic_elements_batch_size(self) -> int: return self.graphic_elements_batch_size
[docs] def get_pdf_extract_batch_size(self) -> int: return self.pdf_extract_batch_size
[docs] def get_pdf_extract_cpus_per_task(self) -> float: return self.pdf_extract_cpus_per_task
[docs] def get_pdf_extract_tasks(self) -> int: return self.pdf_extract_tasks
def __str__(self) -> str: return f"RequestedPlan(embed_initial_actors={self.embed_initial_actors}, embed_min_actors={self.embed_min_actors}, embed_max_actors={self.embed_max_actors}, embed_gpus_per_actor={self.embed_gpus_per_actor}, embed_batch_size={self.embed_batch_size}, nemotron_parse_initial_actors={self.nemotron_parse_initial_actors}, nemotron_parse_min_actors={self.nemotron_parse_min_actors}, nemotron_parse_max_actors={self.nemotron_parse_max_actors}, nemotron_parse_gpus_per_actor={self.nemotron_parse_gpus_per_actor}, nemotron_parse_batch_size={self.nemotron_parse_batch_size}, ocr_initial_actors={self.ocr_initial_actors}, ocr_min_actors={self.ocr_min_actors}, caption_gpus_per_actor={self.caption_gpus_per_actor}, ocr_max_actors={self.ocr_max_actors}, ocr_gpus_per_actor={self.ocr_gpus_per_actor}, ocr_batch_size={self.ocr_batch_size}, page_elements_initial_actors={self.page_elements_initial_actors}, page_elements_min_actors={self.page_elements_min_actors}, page_elements_max_actors={self.page_elements_max_actors}, page_elements_gpus_per_actor={self.page_elements_gpus_per_actor}, page_elements_batch_size={self.page_elements_batch_size}, table_structure_initial_actors={self.table_structure_initial_actors}, table_structure_min_actors={self.table_structure_min_actors}, table_structure_max_actors={self.table_structure_max_actors}, table_structure_gpus_per_actor={self.table_structure_gpus_per_actor}, table_structure_batch_size={self.table_structure_batch_size}, graphic_elements_initial_actors={self.graphic_elements_initial_actors}, graphic_elements_min_actors={self.graphic_elements_min_actors}, graphic_elements_max_actors={self.graphic_elements_max_actors}, graphic_elements_gpus_per_actor={self.graphic_elements_gpus_per_actor}, graphic_elements_batch_size={self.graphic_elements_batch_size}, pdf_extract_batch_size={self.pdf_extract_batch_size}, pdf_extract_cpus_per_task={self.pdf_extract_cpus_per_task}, pdf_extract_tasks={self.pdf_extract_tasks})" # noqa: E501 def __repr__(self) -> str: return self.__str__() def __hash__(self) -> int: return hash( ( self.embed_initial_actors, self.embed_min_actors, self.embed_max_actors, self.embed_gpus_per_actor, self.embed_batch_size, self.nemotron_parse_initial_actors, self.nemotron_parse_min_actors, self.nemotron_parse_max_actors, self.nemotron_parse_gpus_per_actor, self.nemotron_parse_batch_size, self.caption_gpus_per_actor, self.ocr_initial_actors, self.ocr_min_actors, self.ocr_max_actors, self.ocr_gpus_per_actor, self.ocr_batch_size, self.page_elements_initial_actors, self.page_elements_min_actors, self.page_elements_max_actors, self.page_elements_gpus_per_actor, self.page_elements_batch_size, self.table_structure_initial_actors, self.table_structure_min_actors, self.table_structure_max_actors, self.table_structure_gpus_per_actor, self.table_structure_batch_size, self.graphic_elements_initial_actors, self.graphic_elements_min_actors, self.graphic_elements_max_actors, self.graphic_elements_gpus_per_actor, self.graphic_elements_batch_size, self.pdf_extract_batch_size, self.pdf_extract_cpus_per_task, self.pdf_extract_tasks, ) ) def __eq__(self, other: object) -> bool: if not isinstance(other, RequestedPlan): return False return ( self.embed_initial_actors == other.embed_initial_actors and self.embed_min_actors == other.embed_min_actors and self.embed_max_actors == other.embed_max_actors and self.embed_gpus_per_actor == other.embed_gpus_per_actor and self.embed_batch_size == other.embed_batch_size and self.nemotron_parse_initial_actors == other.nemotron_parse_initial_actors and self.nemotron_parse_min_actors == other.nemotron_parse_min_actors and self.nemotron_parse_max_actors == other.nemotron_parse_max_actors and self.nemotron_parse_gpus_per_actor == other.nemotron_parse_gpus_per_actor and self.nemotron_parse_batch_size == other.nemotron_parse_batch_size and self.caption_gpus_per_actor == other.caption_gpus_per_actor and self.ocr_initial_actors == other.ocr_initial_actors and self.ocr_min_actors == other.ocr_min_actors and self.ocr_max_actors == other.ocr_max_actors and self.ocr_gpus_per_actor == other.ocr_gpus_per_actor and self.ocr_batch_size == other.ocr_batch_size and self.page_elements_initial_actors == other.page_elements_initial_actors and self.page_elements_min_actors == other.page_elements_min_actors and self.page_elements_max_actors == other.page_elements_max_actors and self.page_elements_gpus_per_actor == other.page_elements_gpus_per_actor and self.page_elements_batch_size == other.page_elements_batch_size and self.table_structure_initial_actors == other.table_structure_initial_actors and self.table_structure_min_actors == other.table_structure_min_actors and self.table_structure_max_actors == other.table_structure_max_actors and self.table_structure_gpus_per_actor == other.table_structure_gpus_per_actor and self.table_structure_batch_size == other.table_structure_batch_size and self.graphic_elements_initial_actors == other.graphic_elements_initial_actors and self.graphic_elements_min_actors == other.graphic_elements_min_actors and self.graphic_elements_max_actors == other.graphic_elements_max_actors and self.graphic_elements_gpus_per_actor == other.graphic_elements_gpus_per_actor and self.graphic_elements_batch_size == other.graphic_elements_batch_size and self.pdf_extract_batch_size == other.pdf_extract_batch_size and self.pdf_extract_cpus_per_task == other.pdf_extract_cpus_per_task and self.pdf_extract_tasks == other.pdf_extract_tasks ) def __ne__(self, other: object) -> bool: return not self.__eq__(other)
[docs] def resolve_requested_plan( *, cluster_resources: ClusterResources, override_embed_initial_actors: Optional[int] = None, override_embed_min_actors: Optional[int] = None, override_embed_max_actors: Optional[int] = None, override_embed_gpus_per_actor: Optional[float] = None, override_embed_batch_size: Optional[int] = None, override_nemotron_parse_initial_actors: Optional[int] = None, override_nemotron_parse_min_actors: Optional[int] = None, override_nemotron_parse_max_actors: Optional[int] = None, override_nemotron_parse_gpus_per_actor: Optional[float] = None, override_nemotron_parse_batch_size: Optional[int] = None, override_ocr_initial_actors: Optional[int] = None, override_ocr_min_actors: Optional[int] = None, override_ocr_max_actors: Optional[int] = None, override_ocr_gpus_per_actor: Optional[float] = None, override_ocr_batch_size: Optional[int] = None, override_page_elements_initial_actors: Optional[int] = None, override_page_elements_min_actors: Optional[int] = None, override_page_elements_max_actors: Optional[int] = None, override_page_elements_gpus_per_actor: Optional[float] = None, override_page_elements_batch_size: Optional[int] = None, override_table_structure_initial_actors: Optional[int] = None, override_table_structure_min_actors: Optional[int] = None, override_table_structure_max_actors: Optional[int] = None, override_table_structure_gpus_per_actor: Optional[float] = None, override_table_structure_batch_size: Optional[int] = None, override_graphic_elements_initial_actors: Optional[int] = None, override_graphic_elements_min_actors: Optional[int] = None, override_graphic_elements_max_actors: Optional[int] = None, override_graphic_elements_gpus_per_actor: Optional[float] = None, override_graphic_elements_batch_size: Optional[int] = None, override_pdf_extract_batch_size: Optional[int] = None, override_pdf_extract_cpus_per_task: Optional[float] = None, override_pdf_extract_tasks: Optional[int] = None, allow_no_gpu: bool = False, caption_enabled: bool = False, override_caption_gpus_per_actor: Optional[float] = None, ) -> RequestedPlan: available_gpu_count = max(0, int(cluster_resources.available_gpu_count())) if available_gpu_count == 0 and not allow_no_gpu: raise ValueError("No GPUs available") def _resolve_int_actors(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int: if override is not None and override > 0: return int(override) if available_gpu_count == 0: return 1 if multiply_by_available_num_gpu: return int(default * available_gpu_count) return int(default) def _resolve_float_actors(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float: if override is not None and override > 0.0: return float(override) if available_gpu_count == 0: return 0.0 if multiply_by_available_num_gpu: return float(default * available_gpu_count) return float(default) def _resolve_int(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int: if override is not None and override > 0: return int(override) if multiply_by_available_num_gpu: return int(default * available_gpu_count) return int(default) def _resolve_float(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float: if override is not None and override > 0.0: return float(override) if multiply_by_available_num_gpu: return float(default * available_gpu_count) return float(default) embed_initial_actors = _resolve_int_actors(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True) embed_min_actors = _resolve_int_actors(override_embed_min_actors, EMBED_MIN_ACTORS, True) embed_max_actors = _resolve_int_actors(override_embed_max_actors, EMBED_MAX_ACTORS, True) embed_gpus_per_actor = _resolve_float_actors(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False) embed_batch_size = _resolve_int(override_embed_batch_size, EMBED_BATCH_SIZE, False) nemotron_parse_initial_actors = _resolve_int_actors( override_nemotron_parse_initial_actors, NEMOTRON_PARSE_INITIAL_ACTORS, True ) nemotron_parse_min_actors = _resolve_int_actors(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True) nemotron_parse_max_actors = _resolve_int_actors(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True) nemotron_parse_gpus_per_actor = _resolve_float_actors( override_nemotron_parse_gpus_per_actor, VLLM_GPUS_PER_ACTOR, False ) nemotron_parse_batch_size = _resolve_int(override_nemotron_parse_batch_size, NEMOTRON_PARSE_BATCH_SIZE, False) ocr_initial_actors = _resolve_int_actors(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True) ocr_min_actors = _resolve_int_actors(override_ocr_min_actors, OCR_MIN_ACTORS, True) ocr_max_actors = _resolve_int_actors(override_ocr_max_actors, OCR_MAX_ACTORS, True) ocr_gpus_per_actor = _resolve_float_actors(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False) ocr_batch_size = _resolve_int(override_ocr_batch_size, OCR_BATCH_SIZE, False) page_elements_initial_actors = _resolve_int_actors( override_page_elements_initial_actors, PAGE_ELEMENTS_INITIAL_ACTORS, True ) page_elements_min_actors = _resolve_int_actors(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True) page_elements_max_actors = _resolve_int_actors(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True) page_elements_gpus_per_actor = _resolve_float_actors( override_page_elements_gpus_per_actor, PAGE_ELEMENTS_GPUS_PER_ACTOR, False ) page_elements_batch_size = _resolve_int(override_page_elements_batch_size, PAGE_ELEMENTS_BATCH_SIZE, False) table_structure_initial_actors = _resolve_int_actors( override_table_structure_initial_actors, TABLE_STRUCTURE_INITIAL_ACTORS, True ) table_structure_min_actors = _resolve_int_actors( override_table_structure_min_actors, TABLE_STRUCTURE_MIN_ACTORS, True ) table_structure_max_actors = _resolve_int_actors( override_table_structure_max_actors, TABLE_STRUCTURE_MAX_ACTORS, True ) table_structure_gpus_per_actor = _resolve_float_actors( override_table_structure_gpus_per_actor, TABLE_STRUCTURE_GPUS_PER_ACTOR, False ) table_structure_batch_size = _resolve_int(override_table_structure_batch_size, TABLE_STRUCTURE_BATCH_SIZE, False) graphic_elements_initial_actors = _resolve_int_actors( override_graphic_elements_initial_actors, GRAPHIC_ELEMENTS_INITIAL_ACTORS, True ) graphic_elements_min_actors = _resolve_int_actors( override_graphic_elements_min_actors, GRAPHIC_ELEMENTS_MIN_ACTORS, True ) graphic_elements_max_actors = _resolve_int_actors( override_graphic_elements_max_actors, GRAPHIC_ELEMENTS_MAX_ACTORS, True ) graphic_elements_gpus_per_actor = _resolve_float_actors( override_graphic_elements_gpus_per_actor, GRAPHIC_ELEMENTS_GPUS_PER_ACTOR, False ) graphic_elements_batch_size = _resolve_int(override_graphic_elements_batch_size, GRAPHIC_ELEMENTS_BATCH_SIZE, False) pdf_extract_batch_size = _resolve_int(override_pdf_extract_batch_size, PDF_EXTRACT_BATCH_SIZE, False) pdf_extract_cpus_per_task = _resolve_float(override_pdf_extract_cpus_per_task, PDF_EXTRACT_CPUS_PER_TASK, False) pdf_extract_tasks = _resolve_int_actors(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True) # Caption GPU budget. On a single GPU the caption actor (vLLM) must share # with OCR / page-elements / embed, so we halve its reservation and drop # embed to CPU-only. On 2+ GPUs caption gets a dedicated GPU. if caption_enabled: if override_caption_gpus_per_actor is not None: caption_gpus_per_actor = override_caption_gpus_per_actor elif available_gpu_count == 0: caption_gpus_per_actor = 0.0 elif available_gpu_count <= 1: caption_gpus_per_actor = 0.5 else: caption_gpus_per_actor = VLLM_GPUS_PER_ACTOR # On a single GPU, reduce embed from its default (0.5) so all actors # fit within the GPU budget while still retaining CUDA access. if override_embed_gpus_per_actor is None and available_gpu_count <= 1: embed_gpus_per_actor = 0.1 else: caption_gpus_per_actor = 0.0 return RequestedPlan( embed_initial_actors=embed_initial_actors, embed_min_actors=embed_min_actors, embed_max_actors=embed_max_actors, embed_gpus_per_actor=embed_gpus_per_actor, embed_batch_size=embed_batch_size, nemotron_parse_initial_actors=nemotron_parse_initial_actors, nemotron_parse_min_actors=nemotron_parse_min_actors, nemotron_parse_max_actors=nemotron_parse_max_actors, nemotron_parse_gpus_per_actor=nemotron_parse_gpus_per_actor, nemotron_parse_batch_size=nemotron_parse_batch_size, caption_gpus_per_actor=caption_gpus_per_actor, ocr_initial_actors=ocr_initial_actors, ocr_min_actors=ocr_min_actors, ocr_max_actors=ocr_max_actors, ocr_gpus_per_actor=ocr_gpus_per_actor, ocr_batch_size=ocr_batch_size, page_elements_initial_actors=page_elements_initial_actors, page_elements_min_actors=page_elements_min_actors, page_elements_max_actors=page_elements_max_actors, page_elements_gpus_per_actor=page_elements_gpus_per_actor, page_elements_batch_size=page_elements_batch_size, table_structure_initial_actors=table_structure_initial_actors, table_structure_min_actors=table_structure_min_actors, table_structure_max_actors=table_structure_max_actors, table_structure_gpus_per_actor=table_structure_gpus_per_actor, table_structure_batch_size=table_structure_batch_size, graphic_elements_initial_actors=graphic_elements_initial_actors, graphic_elements_min_actors=graphic_elements_min_actors, graphic_elements_max_actors=graphic_elements_max_actors, graphic_elements_gpus_per_actor=graphic_elements_gpus_per_actor, graphic_elements_batch_size=graphic_elements_batch_size, pdf_extract_batch_size=pdf_extract_batch_size, pdf_extract_cpus_per_task=pdf_extract_cpus_per_task, pdf_extract_tasks=pdf_extract_tasks, )