nemoguardrails.guardrails.async_work_queue

View as Markdown

Module Contents

Classes

NameDescription
AsyncWorkQueueAsync Work Queue with static concurrency and queue size
WorkItemDataclass with async function, args, kwargs, and a future to return result

Data

T

log

API

class nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue(
name: str,
max_queue_size: int,
max_concurrency: int,
reject_on_full: bool = False
)

Bases: Generic[T]

Async Work Queue with static concurrency and queue size

WORKER_ERROR_BACKOFF_SECONDS
float = 0.1
_busy_count
= 0
_queue
Queue[WorkItem[T]] = asyncio.Queue(maxsize=max_queue_size)
_workers
List[Task] = []
nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.__aenter__()
async

Context manager (used for testing rather than long-lived instance)

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.__aexit__(
exc_type,
exc_val,
exc_tb
)
async

Context manager (used for testing rather than long-lived instance)

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue._worker_loop() -> None
async
nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.is_busy() -> bool

Returns True if any worker is currently executing a task.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.is_queue_empty() -> bool

Returns True if the queue has zero pending items.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.is_queue_full() -> bool

Returns True if the queue is currently full.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.num_busy_workers() -> int

Returns the number of workers currently executing a task.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.num_pending() -> int

Number of items buffered in the queue, waiting for a worker. Pairs with :meth:num_busy_workers — together they cover the two states a WorkItem can be in.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.start() -> None
async

Starts the worker pool. Call this during service startup.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.stop(
wait_for_completion: bool = True
) -> None
async

Stops the worker pool. Call this during service shutdown.

nemoguardrails.guardrails.async_work_queue.AsyncWorkQueue.submit(
func: typing.Callable[..., typing.Awaitable[nemoguardrails.guardrails.async_work_queue.T]],
args: typing.Any = (),
kwargs: typing.Any = {}
) -> nemoguardrails.guardrails.async_work_queue.T
async

Submit a task. If queue is full:

  • self._reject_on_full=True -> Raises asyncio.QueueFull
  • self._reject_on_full=False -> Blocks caller until slot opens

Note: Automatically starts the queue on first submission (lazy initialization).

class nemoguardrails.guardrails.async_work_queue.WorkItem(
func: typing.Callable[..., typing.Awaitable[nemoguardrails.guardrails.async_work_queue.T]],
args: typing.Tuple[typing.Any, ...],
kwargs: typing.Dict[str, typing.Any],
future: asyncio.Future[nemoguardrails.guardrails.async_work_queue.T]
)
Dataclass

Bases: Generic[T]

Dataclass with async function, args, kwargs, and a future to return result

args
Tuple[Any, ...]
func
Callable[..., Awaitable[T]]
future
Future[T]
kwargs
Dict[str, Any]
nemoguardrails.guardrails.async_work_queue.T = TypeVar('T')
nemoguardrails.guardrails.async_work_queue.log = logging.getLogger(__name__)