# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import logging
import os
from typing import Optional
from pydantic import BaseModel, ConfigDict
logger = logging.getLogger(__name__)
# Global constants for controlling Ray resource hueristic calculations when
# the user does not specify a requested override.
# EMBEDDING Actor constants (PER-GPU)
EMBED_INITIAL_ACTORS = 2 # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up. # noqa: E501
EMBED_MIN_ACTORS = 1 # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number. # noqa: E501
EMBED_MAX_ACTORS = 4 # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available. # noqa: E501
EMBED_GPUS_PER_ACTOR = (
0.5 # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on.
)
EMBED_BATCH_SIZE = 256 # Ray batch size AND EMBEDDING inference batch size
# Nemotron Parse Actor constants (PER-GPU)
NEMOTRON_PARSE_INITIAL_ACTORS = 1 # vLLM manages batching internally; one actor is sufficient. # noqa: E501
NEMOTRON_PARSE_MIN_ACTORS = 1 # vLLM manages batching internally; one actor is sufficient. # noqa: E501
NEMOTRON_PARSE_MAX_ACTORS = 1 # vLLM manages batching internally; one actor is sufficient. # noqa: E501
VLLM_GPUS_PER_ACTOR = 1.0 # vLLM owns the full GPU for KV-cache management and continuous batching. # noqa: E501
NEMOTRON_PARSE_BATCH_SIZE = 64 # Ray batch size AND Nemotron Parse inference batch size
# OCR Actor constants (PER-GPU)
OCR_INITIAL_ACTORS = 3 # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up. # noqa: E501
OCR_MIN_ACTORS = 1 # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number. # noqa: E501
OCR_MAX_ACTORS = 10 # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available. # noqa: E501
OCR_GPUS_PER_ACTOR = 0.1 # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on.
OCR_BATCH_SIZE = 32 # Ray batch size AND OCR inference batch size
# PAGE-ELEMENTS Actor constants (PER-GPU)
PAGE_ELEMENTS_INITIAL_ACTORS = 3 # Hueristic initial num actors per GPU (initial_size of ActorPoolStrategy). Ray starts up this many actors on start-up. # noqa: E501
PAGE_ELEMENTS_MIN_ACTORS = 1 # Hueristic minimum num actors per GPU (min_size of ActorPoolStrategy). Ray tries to never let running actors fall below this number. # noqa: E501
PAGE_ELEMENTS_MAX_ACTORS = 10 # Hueristic baseline num actors per GPU (max_size of ActorPoolStrategy). Ray will grow to this size when resources are available. # noqa: E501
PAGE_ELEMENTS_GPUS_PER_ACTOR = (
0.1 # Hueristic baseline num GPUs per actor. Used to determine which GPU to schedule the actor on. # noqa: E501
)
PAGE_ELEMENTS_BATCH_SIZE = 32 # Ray batch size AND PAGE-ELEMENTS inference batch size
# TABLE-STRUCTURE Actor constants – mirrors page-elements since NIMs are similar weight
TABLE_STRUCTURE_INITIAL_ACTORS = 2
TABLE_STRUCTURE_MIN_ACTORS = 1
TABLE_STRUCTURE_MAX_ACTORS = 6
TABLE_STRUCTURE_GPUS_PER_ACTOR = 0.1
TABLE_STRUCTURE_BATCH_SIZE = 16
# GRAPHIC-ELEMENTS Actor constants – mirrors page-elements since NIMs are similar weight
GRAPHIC_ELEMENTS_INITIAL_ACTORS = 2
GRAPHIC_ELEMENTS_MIN_ACTORS = 1
GRAPHIC_ELEMENTS_MAX_ACTORS = 6
GRAPHIC_ELEMENTS_GPUS_PER_ACTOR = 0.1
GRAPHIC_ELEMENTS_BATCH_SIZE = 16
# PDF EXTRACTOR constants (PER-GPU) - reason being more GPU means more CPU needed to feed the models and keep up
PDF_EXTRACT_BATCH_SIZE = 8 # Ray batch size AND PDF extraction batch size
PDF_EXTRACT_CPUS_PER_TASK = (
2.0 # Hueristic baseline num CPUs per task. Used to determine which CPU to schedule the task on.
)
PDF_EXTRACT_TASKS = 16 # Hueristic baseline num tasks. Used to determine how many CPU tasks to run in parallel.
[docs]
class GpuInfo(BaseModel):
driver_version: str
gpu_name: str
gpu_uuid: str
gpu_brand: str
total_mib: int
used_mib: int
free_mib: int
[docs]
class NodeGpuInfo(BaseModel):
gpus: dict[int, GpuInfo]
def _get_gpu_memory_info() -> NodeGpuInfo:
"""Get the memory information for each GPU."""
from pynvml import (
nvmlInit,
nvmlSystemGetDriverVersion,
nvmlDeviceGetCount,
nvmlDeviceGetHandleByIndex,
nvmlDeviceGetMemoryInfo,
nvmlShutdown,
nvmlDeviceGetName,
nvmlDeviceGetUUID,
nvmlDeviceGetBrand,
)
nvmlInit()
driver_version = nvmlSystemGetDriverVersion()
device_count = nvmlDeviceGetCount()
gpu_info: dict[int, GpuInfo] = {}
for i in range(device_count):
handle = nvmlDeviceGetHandleByIndex(i)
info = nvmlDeviceGetMemoryInfo(handle)
gpu_name = nvmlDeviceGetName(handle)
gpu_uuid = nvmlDeviceGetUUID(handle)
gpu_info[i] = GpuInfo(
driver_version=(
driver_version.decode("utf-8", errors="replace")
if isinstance(driver_version, (bytes, bytearray))
else str(driver_version)
),
gpu_name=(
gpu_name.decode("utf-8", errors="replace")
if isinstance(gpu_name, (bytes, bytearray))
else str(gpu_name)
),
gpu_uuid=(
gpu_uuid.decode("utf-8", errors="replace")
if isinstance(gpu_uuid, (bytes, bytearray))
else str(gpu_uuid)
),
gpu_brand=str(nvmlDeviceGetBrand(handle)),
total_mib=int(info.total // (1024**2)),
used_mib=int(info.used // (1024**2)),
free_mib=int(info.free // (1024**2)),
)
nvmlShutdown()
return NodeGpuInfo(gpus=gpu_info)
[docs]
def get_gpu_memory_info_remote() -> object:
"""Return a Ray ObjectRef for ``_get_gpu_memory_info`` executed remotely."""
import ray
return ray.remote(_get_gpu_memory_info).remote()
[docs]
class Resources(BaseModel):
"""Resources and where they came from."""
model_config = ConfigDict(frozen=True)
cpu_count: int
gpu_count: int
def __str__(self) -> str:
return f"Resources(cpu_count={self.cpu_count}, gpu_count={self.gpu_count})"
def __repr__(self) -> str:
return self.__str__()
def __hash__(self) -> int:
return hash((self.cpu_count, self.gpu_count))
def __eq__(self, other: object) -> bool:
if not isinstance(other, Resources):
return False
return self.cpu_count == other.cpu_count and self.gpu_count == other.gpu_count
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
[docs]
class ClusterResources(BaseModel):
"""Detected compute resources and where they came from."""
model_config = ConfigDict(frozen=True)
total_resources: Resources # Total resources available to the cluster
available_resources: Resources # Available resources to the cluster (not in use currently)
[docs]
def total_cpu_count(self) -> int:
return self.total_resources.cpu_count
[docs]
def total_gpu_count(self) -> int:
return self.total_resources.gpu_count
[docs]
def available_cpu_count(self) -> int:
return self.available_resources.cpu_count
[docs]
def available_gpu_count(self) -> int:
return self.available_resources.gpu_count
def __str__(self) -> str:
return (
f"ClusterResources(total_resources={self.total_resources}, available_resources={self.available_resources})"
)
def __repr__(self) -> str:
return self.__str__()
def __hash__(self) -> int:
return hash((self.total_resources, self.available_resources))
def __eq__(self, other: object) -> bool:
if not isinstance(other, ClusterResources):
return False
return self.total_resources == other.total_resources and self.available_resources == other.available_resources
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
[docs]
def gather_cluster_resources(ray: object) -> ClusterResources:
"""Gather total and available CPU/GPU resources from a Ray cluster."""
if not ray.is_initialized():
raise ValueError("Ray is not initialized")
def _coerce_count(value: object) -> int:
try:
parsed = float(value) # Ray may report fractional available resources.
except (TypeError, ValueError):
return 0
if parsed <= 0:
return 0
return int(parsed)
total_resources: dict[str, object] = ray.cluster_resources()
available_resources: dict[str, object] = ray.available_resources()
return ClusterResources(
total_resources=Resources(
cpu_count=_coerce_count(total_resources.get("CPU", 0)),
gpu_count=_coerce_count(total_resources.get("GPU", 0)),
),
available_resources=Resources(
cpu_count=_coerce_count(available_resources.get("CPU", 0)),
gpu_count=_coerce_count(available_resources.get("GPU", 0)),
),
)
def _detect_local_gpu_count() -> int:
"""Best-effort local GPU count without a hard pynvml dep.
Tried in order: pynvml (rich detail when available), torch.cuda (already in
the core deps), then ``CUDA_VISIBLE_DEVICES`` (purely env-based, used by the
skill-eval harness to pin runs to a single GPU). Returns 0 only when every
probe fails or reports no GPUs — never raises.
Historical bug this guards against: pynvml is an optional dep; when missing,
GPU count came back 0, which silently flipped the rerank ArchetypeOperator
to its CPU variant. That variant auto-fills the build.nvidia.com endpoint,
so a caller asking for a local GPU reranker silently turned into a remote
HTTP request — and a 401 / cost spike when the call eventually failed.
"""
try:
return int(len(_get_gpu_memory_info().gpus))
except Exception as exc: # pynvml missing, NVML runtime unavailable, etc.
logger.debug("pynvml GPU detection failed (%s); trying torch.cuda", exc)
try:
import torch
if torch.cuda.is_available():
return int(torch.cuda.device_count())
except Exception as exc:
logger.debug("torch.cuda GPU detection failed (%s); trying CUDA_VISIBLE_DEVICES", exc)
visible = os.environ.get("CUDA_VISIBLE_DEVICES")
if visible is not None:
# ``""`` is a valid "no GPUs visible" signal; ``"0,1"`` etc means N visible.
ids = [tok for tok in visible.split(",") if tok.strip()]
return len(ids)
return 0
[docs]
def gather_local_resources() -> Resources:
"""Gather local CPU/GPU resources without requiring Ray."""
cpu_count = int(os.cpu_count() or 0)
return Resources(cpu_count=cpu_count, gpu_count=_detect_local_gpu_count())
[docs]
class RequestedPlan(BaseModel):
"""Contains the requested Ray DAG plan for the batch ingest."""
model_config = ConfigDict(frozen=True)
# Embedder resources requested to satisfy DAG plan
embed_initial_actors: int
embed_min_actors: int
embed_max_actors: int
embed_gpus_per_actor: float
embed_batch_size: int
# Nemotron Parse resources requested to satisfy DAG plan
nemotron_parse_initial_actors: int
nemotron_parse_min_actors: int
nemotron_parse_max_actors: int
nemotron_parse_gpus_per_actor: float
nemotron_parse_batch_size: int
# OCR resources requested to satisfy DAG plan
ocr_initial_actors: int
ocr_min_actors: int
ocr_max_actors: int
ocr_gpus_per_actor: float
ocr_batch_size: int
# Page Elements resources requested to satisfy DAG plan
page_elements_initial_actors: int
page_elements_min_actors: int
page_elements_max_actors: int
page_elements_gpus_per_actor: float
page_elements_batch_size: int
# Table Structure resources requested to satisfy DAG plan
table_structure_initial_actors: int
table_structure_min_actors: int
table_structure_max_actors: int
table_structure_gpus_per_actor: float
table_structure_batch_size: int
# Graphic Elements resources requested to satisfy DAG plan
graphic_elements_initial_actors: int
graphic_elements_min_actors: int
graphic_elements_max_actors: int
graphic_elements_gpus_per_actor: float
graphic_elements_batch_size: int
# Caption resources requested to satisfy DAG plan
caption_gpus_per_actor: float
# PDF Extraction resources requested to satisfy DAG plan
pdf_extract_batch_size: int
pdf_extract_cpus_per_task: float
pdf_extract_tasks: int
[docs]
def get_embed_initial_actors(self) -> int:
return self.embed_initial_actors
[docs]
def get_embed_min_actors(self) -> int:
return self.embed_min_actors
[docs]
def get_embed_max_actors(self) -> int:
return self.embed_max_actors
[docs]
def get_embed_gpus_per_actor(self) -> float:
return self.embed_gpus_per_actor
[docs]
def get_embed_batch_size(self) -> int:
return self.embed_batch_size
[docs]
def get_nemotron_parse_initial_actors(self) -> int:
return self.nemotron_parse_initial_actors
[docs]
def get_nemotron_parse_min_actors(self) -> int:
return self.nemotron_parse_min_actors
[docs]
def get_nemotron_parse_max_actors(self) -> int:
return self.nemotron_parse_max_actors
[docs]
def get_nemotron_parse_gpus_per_actor(self) -> float:
return self.nemotron_parse_gpus_per_actor
[docs]
def get_nemotron_parse_batch_size(self) -> int:
return self.nemotron_parse_batch_size
[docs]
def get_ocr_initial_actors(self) -> int:
return self.ocr_initial_actors
[docs]
def get_ocr_min_actors(self) -> int:
return self.ocr_min_actors
[docs]
def get_ocr_max_actors(self) -> int:
return self.ocr_max_actors
[docs]
def get_ocr_gpus_per_actor(self) -> float:
return self.ocr_gpus_per_actor
[docs]
def get_ocr_batch_size(self) -> int:
return self.ocr_batch_size
[docs]
def get_page_elements_initial_actors(self) -> int:
return self.page_elements_initial_actors
[docs]
def get_page_elements_min_actors(self) -> int:
return self.page_elements_min_actors
[docs]
def get_page_elements_max_actors(self) -> int:
return self.page_elements_max_actors
[docs]
def get_page_elements_gpus_per_actor(self) -> float:
return self.page_elements_gpus_per_actor
[docs]
def get_page_elements_batch_size(self) -> int:
return self.page_elements_batch_size
[docs]
def get_table_structure_initial_actors(self) -> int:
return self.table_structure_initial_actors
[docs]
def get_table_structure_min_actors(self) -> int:
return self.table_structure_min_actors
[docs]
def get_table_structure_max_actors(self) -> int:
return self.table_structure_max_actors
[docs]
def get_table_structure_gpus_per_actor(self) -> float:
return self.table_structure_gpus_per_actor
[docs]
def get_table_structure_batch_size(self) -> int:
return self.table_structure_batch_size
[docs]
def get_graphic_elements_initial_actors(self) -> int:
return self.graphic_elements_initial_actors
[docs]
def get_graphic_elements_min_actors(self) -> int:
return self.graphic_elements_min_actors
[docs]
def get_graphic_elements_max_actors(self) -> int:
return self.graphic_elements_max_actors
[docs]
def get_graphic_elements_gpus_per_actor(self) -> float:
return self.graphic_elements_gpus_per_actor
[docs]
def get_graphic_elements_batch_size(self) -> int:
return self.graphic_elements_batch_size
def __str__(self) -> str:
return f"RequestedPlan(embed_initial_actors={self.embed_initial_actors}, embed_min_actors={self.embed_min_actors}, embed_max_actors={self.embed_max_actors}, embed_gpus_per_actor={self.embed_gpus_per_actor}, embed_batch_size={self.embed_batch_size}, nemotron_parse_initial_actors={self.nemotron_parse_initial_actors}, nemotron_parse_min_actors={self.nemotron_parse_min_actors}, nemotron_parse_max_actors={self.nemotron_parse_max_actors}, nemotron_parse_gpus_per_actor={self.nemotron_parse_gpus_per_actor}, nemotron_parse_batch_size={self.nemotron_parse_batch_size}, ocr_initial_actors={self.ocr_initial_actors}, ocr_min_actors={self.ocr_min_actors}, caption_gpus_per_actor={self.caption_gpus_per_actor}, ocr_max_actors={self.ocr_max_actors}, ocr_gpus_per_actor={self.ocr_gpus_per_actor}, ocr_batch_size={self.ocr_batch_size}, page_elements_initial_actors={self.page_elements_initial_actors}, page_elements_min_actors={self.page_elements_min_actors}, page_elements_max_actors={self.page_elements_max_actors}, page_elements_gpus_per_actor={self.page_elements_gpus_per_actor}, page_elements_batch_size={self.page_elements_batch_size}, table_structure_initial_actors={self.table_structure_initial_actors}, table_structure_min_actors={self.table_structure_min_actors}, table_structure_max_actors={self.table_structure_max_actors}, table_structure_gpus_per_actor={self.table_structure_gpus_per_actor}, table_structure_batch_size={self.table_structure_batch_size}, graphic_elements_initial_actors={self.graphic_elements_initial_actors}, graphic_elements_min_actors={self.graphic_elements_min_actors}, graphic_elements_max_actors={self.graphic_elements_max_actors}, graphic_elements_gpus_per_actor={self.graphic_elements_gpus_per_actor}, graphic_elements_batch_size={self.graphic_elements_batch_size}, pdf_extract_batch_size={self.pdf_extract_batch_size}, pdf_extract_cpus_per_task={self.pdf_extract_cpus_per_task}, pdf_extract_tasks={self.pdf_extract_tasks})" # noqa: E501
def __repr__(self) -> str:
return self.__str__()
def __hash__(self) -> int:
return hash(
(
self.embed_initial_actors,
self.embed_min_actors,
self.embed_max_actors,
self.embed_gpus_per_actor,
self.embed_batch_size,
self.nemotron_parse_initial_actors,
self.nemotron_parse_min_actors,
self.nemotron_parse_max_actors,
self.nemotron_parse_gpus_per_actor,
self.nemotron_parse_batch_size,
self.caption_gpus_per_actor,
self.ocr_initial_actors,
self.ocr_min_actors,
self.ocr_max_actors,
self.ocr_gpus_per_actor,
self.ocr_batch_size,
self.page_elements_initial_actors,
self.page_elements_min_actors,
self.page_elements_max_actors,
self.page_elements_gpus_per_actor,
self.page_elements_batch_size,
self.table_structure_initial_actors,
self.table_structure_min_actors,
self.table_structure_max_actors,
self.table_structure_gpus_per_actor,
self.table_structure_batch_size,
self.graphic_elements_initial_actors,
self.graphic_elements_min_actors,
self.graphic_elements_max_actors,
self.graphic_elements_gpus_per_actor,
self.graphic_elements_batch_size,
self.pdf_extract_batch_size,
self.pdf_extract_cpus_per_task,
self.pdf_extract_tasks,
)
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, RequestedPlan):
return False
return (
self.embed_initial_actors == other.embed_initial_actors
and self.embed_min_actors == other.embed_min_actors
and self.embed_max_actors == other.embed_max_actors
and self.embed_gpus_per_actor == other.embed_gpus_per_actor
and self.embed_batch_size == other.embed_batch_size
and self.nemotron_parse_initial_actors == other.nemotron_parse_initial_actors
and self.nemotron_parse_min_actors == other.nemotron_parse_min_actors
and self.nemotron_parse_max_actors == other.nemotron_parse_max_actors
and self.nemotron_parse_gpus_per_actor == other.nemotron_parse_gpus_per_actor
and self.nemotron_parse_batch_size == other.nemotron_parse_batch_size
and self.caption_gpus_per_actor == other.caption_gpus_per_actor
and self.ocr_initial_actors == other.ocr_initial_actors
and self.ocr_min_actors == other.ocr_min_actors
and self.ocr_max_actors == other.ocr_max_actors
and self.ocr_gpus_per_actor == other.ocr_gpus_per_actor
and self.ocr_batch_size == other.ocr_batch_size
and self.page_elements_initial_actors == other.page_elements_initial_actors
and self.page_elements_min_actors == other.page_elements_min_actors
and self.page_elements_max_actors == other.page_elements_max_actors
and self.page_elements_gpus_per_actor == other.page_elements_gpus_per_actor
and self.page_elements_batch_size == other.page_elements_batch_size
and self.table_structure_initial_actors == other.table_structure_initial_actors
and self.table_structure_min_actors == other.table_structure_min_actors
and self.table_structure_max_actors == other.table_structure_max_actors
and self.table_structure_gpus_per_actor == other.table_structure_gpus_per_actor
and self.table_structure_batch_size == other.table_structure_batch_size
and self.graphic_elements_initial_actors == other.graphic_elements_initial_actors
and self.graphic_elements_min_actors == other.graphic_elements_min_actors
and self.graphic_elements_max_actors == other.graphic_elements_max_actors
and self.graphic_elements_gpus_per_actor == other.graphic_elements_gpus_per_actor
and self.graphic_elements_batch_size == other.graphic_elements_batch_size
and self.pdf_extract_batch_size == other.pdf_extract_batch_size
and self.pdf_extract_cpus_per_task == other.pdf_extract_cpus_per_task
and self.pdf_extract_tasks == other.pdf_extract_tasks
)
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
[docs]
def resolve_requested_plan(
*,
cluster_resources: ClusterResources,
override_embed_initial_actors: Optional[int] = None,
override_embed_min_actors: Optional[int] = None,
override_embed_max_actors: Optional[int] = None,
override_embed_gpus_per_actor: Optional[float] = None,
override_embed_batch_size: Optional[int] = None,
override_nemotron_parse_initial_actors: Optional[int] = None,
override_nemotron_parse_min_actors: Optional[int] = None,
override_nemotron_parse_max_actors: Optional[int] = None,
override_nemotron_parse_gpus_per_actor: Optional[float] = None,
override_nemotron_parse_batch_size: Optional[int] = None,
override_ocr_initial_actors: Optional[int] = None,
override_ocr_min_actors: Optional[int] = None,
override_ocr_max_actors: Optional[int] = None,
override_ocr_gpus_per_actor: Optional[float] = None,
override_ocr_batch_size: Optional[int] = None,
override_page_elements_initial_actors: Optional[int] = None,
override_page_elements_min_actors: Optional[int] = None,
override_page_elements_max_actors: Optional[int] = None,
override_page_elements_gpus_per_actor: Optional[float] = None,
override_page_elements_batch_size: Optional[int] = None,
override_table_structure_initial_actors: Optional[int] = None,
override_table_structure_min_actors: Optional[int] = None,
override_table_structure_max_actors: Optional[int] = None,
override_table_structure_gpus_per_actor: Optional[float] = None,
override_table_structure_batch_size: Optional[int] = None,
override_graphic_elements_initial_actors: Optional[int] = None,
override_graphic_elements_min_actors: Optional[int] = None,
override_graphic_elements_max_actors: Optional[int] = None,
override_graphic_elements_gpus_per_actor: Optional[float] = None,
override_graphic_elements_batch_size: Optional[int] = None,
override_pdf_extract_batch_size: Optional[int] = None,
override_pdf_extract_cpus_per_task: Optional[float] = None,
override_pdf_extract_tasks: Optional[int] = None,
allow_no_gpu: bool = False,
caption_enabled: bool = False,
override_caption_gpus_per_actor: Optional[float] = None,
) -> RequestedPlan:
available_gpu_count = max(0, int(cluster_resources.available_gpu_count()))
if available_gpu_count == 0 and not allow_no_gpu:
raise ValueError("No GPUs available")
def _resolve_int_actors(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int:
if override is not None and override > 0:
return int(override)
if available_gpu_count == 0:
return 1
if multiply_by_available_num_gpu:
return int(default * available_gpu_count)
return int(default)
def _resolve_float_actors(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float:
if override is not None and override > 0.0:
return float(override)
if available_gpu_count == 0:
return 0.0
if multiply_by_available_num_gpu:
return float(default * available_gpu_count)
return float(default)
def _resolve_int(override: Optional[int], default: int, multiply_by_available_num_gpu: bool) -> int:
if override is not None and override > 0:
return int(override)
if multiply_by_available_num_gpu:
return int(default * available_gpu_count)
return int(default)
def _resolve_float(override: Optional[float], default: float, multiply_by_available_num_gpu: bool) -> float:
if override is not None and override > 0.0:
return float(override)
if multiply_by_available_num_gpu:
return float(default * available_gpu_count)
return float(default)
embed_initial_actors = _resolve_int_actors(override_embed_initial_actors, EMBED_INITIAL_ACTORS, True)
embed_min_actors = _resolve_int_actors(override_embed_min_actors, EMBED_MIN_ACTORS, True)
embed_max_actors = _resolve_int_actors(override_embed_max_actors, EMBED_MAX_ACTORS, True)
embed_gpus_per_actor = _resolve_float_actors(override_embed_gpus_per_actor, EMBED_GPUS_PER_ACTOR, False)
embed_batch_size = _resolve_int(override_embed_batch_size, EMBED_BATCH_SIZE, False)
nemotron_parse_initial_actors = _resolve_int_actors(
override_nemotron_parse_initial_actors, NEMOTRON_PARSE_INITIAL_ACTORS, True
)
nemotron_parse_min_actors = _resolve_int_actors(override_nemotron_parse_min_actors, NEMOTRON_PARSE_MIN_ACTORS, True)
nemotron_parse_max_actors = _resolve_int_actors(override_nemotron_parse_max_actors, NEMOTRON_PARSE_MAX_ACTORS, True)
nemotron_parse_gpus_per_actor = _resolve_float_actors(
override_nemotron_parse_gpus_per_actor, VLLM_GPUS_PER_ACTOR, False
)
nemotron_parse_batch_size = _resolve_int(override_nemotron_parse_batch_size, NEMOTRON_PARSE_BATCH_SIZE, False)
ocr_initial_actors = _resolve_int_actors(override_ocr_initial_actors, OCR_INITIAL_ACTORS, True)
ocr_min_actors = _resolve_int_actors(override_ocr_min_actors, OCR_MIN_ACTORS, True)
ocr_max_actors = _resolve_int_actors(override_ocr_max_actors, OCR_MAX_ACTORS, True)
ocr_gpus_per_actor = _resolve_float_actors(override_ocr_gpus_per_actor, OCR_GPUS_PER_ACTOR, False)
ocr_batch_size = _resolve_int(override_ocr_batch_size, OCR_BATCH_SIZE, False)
page_elements_initial_actors = _resolve_int_actors(
override_page_elements_initial_actors, PAGE_ELEMENTS_INITIAL_ACTORS, True
)
page_elements_min_actors = _resolve_int_actors(override_page_elements_min_actors, PAGE_ELEMENTS_MIN_ACTORS, True)
page_elements_max_actors = _resolve_int_actors(override_page_elements_max_actors, PAGE_ELEMENTS_MAX_ACTORS, True)
page_elements_gpus_per_actor = _resolve_float_actors(
override_page_elements_gpus_per_actor, PAGE_ELEMENTS_GPUS_PER_ACTOR, False
)
page_elements_batch_size = _resolve_int(override_page_elements_batch_size, PAGE_ELEMENTS_BATCH_SIZE, False)
table_structure_initial_actors = _resolve_int_actors(
override_table_structure_initial_actors, TABLE_STRUCTURE_INITIAL_ACTORS, True
)
table_structure_min_actors = _resolve_int_actors(
override_table_structure_min_actors, TABLE_STRUCTURE_MIN_ACTORS, True
)
table_structure_max_actors = _resolve_int_actors(
override_table_structure_max_actors, TABLE_STRUCTURE_MAX_ACTORS, True
)
table_structure_gpus_per_actor = _resolve_float_actors(
override_table_structure_gpus_per_actor, TABLE_STRUCTURE_GPUS_PER_ACTOR, False
)
table_structure_batch_size = _resolve_int(override_table_structure_batch_size, TABLE_STRUCTURE_BATCH_SIZE, False)
graphic_elements_initial_actors = _resolve_int_actors(
override_graphic_elements_initial_actors, GRAPHIC_ELEMENTS_INITIAL_ACTORS, True
)
graphic_elements_min_actors = _resolve_int_actors(
override_graphic_elements_min_actors, GRAPHIC_ELEMENTS_MIN_ACTORS, True
)
graphic_elements_max_actors = _resolve_int_actors(
override_graphic_elements_max_actors, GRAPHIC_ELEMENTS_MAX_ACTORS, True
)
graphic_elements_gpus_per_actor = _resolve_float_actors(
override_graphic_elements_gpus_per_actor, GRAPHIC_ELEMENTS_GPUS_PER_ACTOR, False
)
graphic_elements_batch_size = _resolve_int(override_graphic_elements_batch_size, GRAPHIC_ELEMENTS_BATCH_SIZE, False)
pdf_extract_batch_size = _resolve_int(override_pdf_extract_batch_size, PDF_EXTRACT_BATCH_SIZE, False)
pdf_extract_cpus_per_task = _resolve_float(override_pdf_extract_cpus_per_task, PDF_EXTRACT_CPUS_PER_TASK, False)
pdf_extract_tasks = _resolve_int_actors(override_pdf_extract_tasks, PDF_EXTRACT_TASKS, True)
# Caption GPU budget. On a single GPU the caption actor (vLLM) must share
# with OCR / page-elements / embed, so we halve its reservation and drop
# embed to CPU-only. On 2+ GPUs caption gets a dedicated GPU.
if caption_enabled:
if override_caption_gpus_per_actor is not None:
caption_gpus_per_actor = override_caption_gpus_per_actor
elif available_gpu_count == 0:
caption_gpus_per_actor = 0.0
elif available_gpu_count <= 1:
caption_gpus_per_actor = 0.5
else:
caption_gpus_per_actor = VLLM_GPUS_PER_ACTOR
# On a single GPU, reduce embed from its default (0.5) so all actors
# fit within the GPU budget while still retaining CUDA access.
if override_embed_gpus_per_actor is None and available_gpu_count <= 1:
embed_gpus_per_actor = 0.1
else:
caption_gpus_per_actor = 0.0
return RequestedPlan(
embed_initial_actors=embed_initial_actors,
embed_min_actors=embed_min_actors,
embed_max_actors=embed_max_actors,
embed_gpus_per_actor=embed_gpus_per_actor,
embed_batch_size=embed_batch_size,
nemotron_parse_initial_actors=nemotron_parse_initial_actors,
nemotron_parse_min_actors=nemotron_parse_min_actors,
nemotron_parse_max_actors=nemotron_parse_max_actors,
nemotron_parse_gpus_per_actor=nemotron_parse_gpus_per_actor,
nemotron_parse_batch_size=nemotron_parse_batch_size,
caption_gpus_per_actor=caption_gpus_per_actor,
ocr_initial_actors=ocr_initial_actors,
ocr_min_actors=ocr_min_actors,
ocr_max_actors=ocr_max_actors,
ocr_gpus_per_actor=ocr_gpus_per_actor,
ocr_batch_size=ocr_batch_size,
page_elements_initial_actors=page_elements_initial_actors,
page_elements_min_actors=page_elements_min_actors,
page_elements_max_actors=page_elements_max_actors,
page_elements_gpus_per_actor=page_elements_gpus_per_actor,
page_elements_batch_size=page_elements_batch_size,
table_structure_initial_actors=table_structure_initial_actors,
table_structure_min_actors=table_structure_min_actors,
table_structure_max_actors=table_structure_max_actors,
table_structure_gpus_per_actor=table_structure_gpus_per_actor,
table_structure_batch_size=table_structure_batch_size,
graphic_elements_initial_actors=graphic_elements_initial_actors,
graphic_elements_min_actors=graphic_elements_min_actors,
graphic_elements_max_actors=graphic_elements_max_actors,
graphic_elements_gpus_per_actor=graphic_elements_gpus_per_actor,
graphic_elements_batch_size=graphic_elements_batch_size,
pdf_extract_batch_size=pdf_extract_batch_size,
pdf_extract_cpus_per_task=pdf_extract_cpus_per_task,
pdf_extract_tasks=pdf_extract_tasks,
)