nemoguardrails.guardrails.iorails

View as Markdown

Optimized IORails Engine for specific guardrail configurations.

This module provides an optimized inference path for guardrail configurations that only use specific supported flows (input/output content safety). For configurations outside this supported set, the standard LLMRails engine should be used instead.

Module Contents

Classes

NameDescription
IORailsWorkflow engine for accelerated Input/Output rails inference.

Functions

NameDescription
_build_assistant_messageBuild the assistant message returned by generate.
_serialize_tool_callsSerialize ToolCall objects to OpenAI /chat/completions shape.

Data

NONSTREAM_MAX_CONCURRENCY

NONSTREAM_QUEUE_DEPTH

REFUSAL_MESSAGE

STREAM_MAX_CONCURRENCY

_GENERATION_ERROR_TYPE

log

API

class nemoguardrails.guardrails.iorails.IORails(
config: nemoguardrails.rails.llm.config.RailsConfig,
_report_usage: bool = True
)

Bases: BaseGuardrails

Workflow engine for accelerated Input/Output rails inference.

SUPPORTED_INPUT_FLOWS
SUPPORTED_OUTPUT_FLOWS
= frozenset({'content safety check output'})
SUPPORTED_RAILS
= frozenset({'input', 'output', 'config'})
_content_capture_enabled
_generate_async_queue
_has_streaming_output_rails
bool

True when output rails are configured and streaming is enabled for them.

_metrics_enabled
= are_metrics_enabled(config.metrics)
_speculative_generation
= config.rails.input.speculative_generation or False
_stream_semaphore
= asyncio.Semaphore(STREAM_MAX_CONCURRENCY)
_tracer
= get_tracer() if self._tracing_enabled else None
_tracing_enabled
= is_tracing_enabled(config.tracing)
engine_registry
rails_manager
nemoguardrails.guardrails.iorails.IORails.__aenter__()
async

Context manager (used for testing rather than long-lived instance)

nemoguardrails.guardrails.iorails.IORails.__aexit__(
exc_type,
exc_val,
exc_tb
)
async

Context manager (used for testing rather than long-lived instance)

nemoguardrails.guardrails.iorails.IORails._do_generate(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
req_id: str,
request_span: typing.Optional[opentelemetry.trace.Span] = None,
kwargs = {}
) -> nemoguardrails.guardrails.guardrails_types.LLMMessage
async

Core pipeline: input rails -> LLM call -> output rails.

nemoguardrails.guardrails.iorails.IORails._do_generate_sequential(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
req_id: str,
llm_kwargs: dict
) -> typing.Optional[nemoguardrails.types.LLMResponse]
async

Sequential path: input rails block before LLM generation starts.

nemoguardrails.guardrails.iorails.IORails._do_generate_speculative(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
req_id: str,
llm_kwargs: dict,
request_span: typing.Optional[opentelemetry.trace.Span] = None
) -> typing.Optional[nemoguardrails.types.LLMResponse]
async

Speculative path: input rails and LLM generation race concurrently.

nemoguardrails.guardrails.iorails.IORails._parallel_input_rail_and_response_generation(
rails_task: asyncio.Task,
gen_task: asyncio.Task,
req_id: str,
request_span: typing.Optional[opentelemetry.trace.Span] = None
) -> typing.Optional[nemoguardrails.types.LLMResponse]
async

Race input rails against LLM generation, return LLMResponse or None (rejected).

nemoguardrails.guardrails.iorails.IORails._run_generate(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
kwargs = {}
) -> nemoguardrails.guardrails.guardrails_types.LLMMessage
async

Runs inside a queue worker task. Wraps the pipeline in traced_request so each request gets its own span + request ID, then delegates to _do_generate for the actual input rails → LLM → output rails flow. Metrics are emitted at the outer lifecycle scope by generate_async, not here.

nemoguardrails.guardrails.iorails.IORails._run_output_rails_in_streaming(
streaming_handler: collections.abc.AsyncIterator[typing.Union[str, dict]],
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages
) -> collections.abc.AsyncGenerator[typing.Union[str, dict], None]
async

Buffer streamed chunks and run output rails on each batch.

Uses the same RollingBuffer and stream_first semantics as LLMRails:

  • stream_first=True: yield chunks immediately, then run output rails. If unsafe, inject an error and stop.
  • stream_first=False: run output rails first, only yield chunks if safe.
nemoguardrails.guardrails.iorails.IORails._validate_streaming_with_output_rails() -> None

Raise if output rails exist but streaming is not enabled for them.

nemoguardrails.guardrails.iorails.IORails.can_handle(
config: nemoguardrails.rails.llm.config.RailsConfig,
llm: typing.Optional[nemoguardrails.types.LLMModel] = None
) -> bool
classmethod

Return True iff IORails can handle the given config and llm argument.

nemoguardrails.guardrails.iorails.IORails.generate(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
kwargs = {}
) -> nemoguardrails.guardrails.guardrails_types.LLMMessage

Synchronous version of generate_async.

Telemetry is disabled for the ephemeral IORails object used for the generate() call. For production use, use the asynchronous generate_async() and stream_async() methods for non-streaming and streaming requests respectively.

nemoguardrails.guardrails.iorails.IORails.generate_async(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
kwargs = {}
) -> nemoguardrails.guardrails.guardrails_types.LLMMessage
async

Public entry: submit the request to the internal work queue.

The queue enforces non-streaming concurrency limits (NONSTREAM_MAX_CONCURRENCY workers draining up to NONSTREAM_QUEUE_DEPTH pending items). Callers receive asyncio.QueueFull when the admission buffer is full and guardrails.nonstream.rejections increments if metrics are enabled.

Request-level metrics (guardrails.requests, guardrails.request.duration, guardrails.requests.errors) wrap the queue submission, so duration includes queue-wait time (OTEL HTTP semconv). A QueueFull rejection shows up in BOTH requests.errors{error.type=QueueFull} and nonstream.rejections — honest dual-signal reporting.

nemoguardrails.guardrails.iorails.IORails.start() -> None
async

Start the IORails engine. Call this during service startup.

nemoguardrails.guardrails.iorails.IORails.stop() -> None
async

Stop the IORails engine. Call this during service shutdown.

nemoguardrails.guardrails.iorails.IORails.stream_async(
messages: nemoguardrails.guardrails.guardrails_types.LLMMessages,
options: typing.Optional[typing.Union[dict, nemoguardrails.rails.llm.options.GenerationOptions]] = None,
include_metadata: typing.Optional[bool] = False
) -> collections.abc.AsyncIterator[typing.Union[str, dict]]

Stream LLM response tokens with input/output rails applied.

Returns an async iterator that yields string chunks (or dicts when include_metadata=True). Input rails run before any tokens are streamed. If output rails are configured and streaming is enabled, tokens are buffered and checked using the same RollingBuffer / stream_first semantics as LLMRails.

Parameters:

messages
LLMMessages

Conversation messages in OpenAI format.

options
Optional[Union[dict, GenerationOptions]]Defaults to None

Optional GenerationOptions (llm_params are forwarded to the main LLM call).

include_metadata
Optional[bool]Defaults to False

When True, chunks are dicts with text and metadata keys instead of plain strings.

Returns: AsyncIterator[Union[str, dict]]

An async iterator of string chunks (or dicts).

Raises:

  • StreamingNotSupportedError: If output rails are present but rails.output.streaming.enabled is False.
  • ValueError: If include_metadata=True with output rails streaming enabled (BufferStrategy requires plain string chunks).
  • asyncio.QueueFull: If the streaming concurrency limit is reached (load shedding).
nemoguardrails.guardrails.iorails.IORails.unsupported_reason(
config: nemoguardrails.rails.llm.config.RailsConfig,
llm: typing.Optional[nemoguardrails.types.LLMModel] = None
) -> typing.Optional[str]
classmethod

Return None if IORails can handle (config, llm), else a human-readable reason.

nemoguardrails.guardrails.iorails._build_assistant_message(
content: str,
tool_calls: typing.Optional[list[nemoguardrails.types.ToolCall]]
) -> nemoguardrails.guardrails.guardrails_types.LLMMessage

Build the assistant message returned by generate.

Without tool calls this is the existing {"role", "content"} shape. With tool calls present, the calls are serialized to OpenAI shape and content is set to None when empty, matching the OpenAI assistant-message contract.

nemoguardrails.guardrails.iorails._serialize_tool_calls(
tool_calls: list[nemoguardrails.types.ToolCall]
) -> list[dict]

Serialize ToolCall objects to OpenAI /chat/completions shape.

function.arguments is emitted as a JSON string (OpenAI-native) rather than the canonical dict carried internally, so the output round-trips through OpenAI-compatible clients.

nemoguardrails.guardrails.iorails.NONSTREAM_MAX_CONCURRENCY = 256
nemoguardrails.guardrails.iorails.NONSTREAM_QUEUE_DEPTH = 256
nemoguardrails.guardrails.iorails.REFUSAL_MESSAGE = "I'm sorry, I can't respond to that."
nemoguardrails.guardrails.iorails.STREAM_MAX_CONCURRENCY = 256
nemoguardrails.guardrails.iorails._GENERATION_ERROR_TYPE = 'generation_error'
nemoguardrails.guardrails.iorails.log = logging.getLogger(__name__)