nemoguardrails.streaming

View as Markdown

Module Contents

Classes

NameDescription
StreamingHandlerProvider-agnostic streaming handler with prefix/suffix/stop handling.

Data

END_OF_STREAM

log

API

class nemoguardrails.streaming.StreamingHandler(
enable_print: bool = False,
enable_buffer: bool = False,
include_metadata: typing.Optional[bool] = False,
include_generation_metadata: typing.Optional[bool] = None
)

Bases: AsyncIterator

Provider-agnostic streaming handler with prefix/suffix/stop handling.

Implements AsyncIterator interface so it can be used directly to stream back the response. Chunks are pushed via push_chunk() and consumed via async iteration.

_stop
= []
buffer
= ''
completion
= ''
current_chunk
= ''
current_metadata
= {}
k
= 0
queue
= asyncio.Queue()
stop
List[str]
streaming_finished_event
= asyncio.Event()
top_k_nonempty_lines_event
= asyncio.Event()
uid
= new_uuid()
nemoguardrails.streaming.StreamingHandler.__aiter__()
nemoguardrails.streaming.StreamingHandler.__anext__()
async
nemoguardrails.streaming.StreamingHandler._process(
chunk: typing.Union[str, object],
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None
)
async

Process a chunk of text.

If we’re in buffering mode, record the text. Otherwise, update the full completion, check for stop tokens, and enqueue the chunk.

nemoguardrails.streaming.StreamingHandler.disable_buffering()
async

When we disable the buffer, we process the buffer as a chunk.

nemoguardrails.streaming.StreamingHandler.enable_buffering()
async
nemoguardrails.streaming.StreamingHandler.finish()
async

Signal end of stream.

nemoguardrails.streaming.StreamingHandler.push_chunk(
chunk: typing.Union[str, None],
metadata: typing.Optional[typing.Dict[str, typing.Any]] = None
)
async

Push a new string chunk to the stream.

Parameters:

chunk
Union[str, None]

String chunk to push, None to signal end of stream, or END_OF_STREAM sentinel.

metadata
Optional[Dict[str, Any]]Defaults to None

Optional metadata about the generation.

nemoguardrails.streaming.StreamingHandler.set_pattern(
prefix: typing.Optional[str] = None,
suffix: typing.Optional[str] = None
)

Sets the pattern that is expected.

If a prefix or a suffix are specified, they will be removed from the output.

nemoguardrails.streaming.StreamingHandler.wait()
async

Waits until the stream finishes and returns the full completion.

nemoguardrails.streaming.StreamingHandler.wait_top_k_nonempty_lines(
k: int
)
async

Waits for top k non-empty lines from the LLM.

When k lines have been received (and k+1 has been started) it will return and remove them from the buffer.

nemoguardrails.streaming.END_OF_STREAM = object()
nemoguardrails.streaming.log = logging.getLogger(__name__)