JailedStream Implementation#
Overview#
The JailedStream is a standalone implementation for handling “jail” detection in token streams. It provides a clean, builder-based API for accumulating tokens when certain sequences are detected, then releasing them as a single chunk when the jail ends.
Key Features#
Builder Pattern: Clean configuration API using the builder pattern
Configurable Sequences: Support for multiple start/end jail sequences
Tool Call Parsing: Integrated tool call detection and parsing
Stream Macro: Uses
async-stream::stream!for clean async implementationStandalone: Completely independent of existing code
Annotations: Preserves annotations for observability
Implementation#
Location#
Main implementation:
lib/llm/src/protocols/openai/chat_completions/jail.rsExamples:
lib/llm/src/protocols/openai/chat_completions/jail_example.rs
Usage#
use crate::protocols::openai::chat_completions::jail::JailedStream;
use dynamo_runtime::engine::{AsyncEngineContextProvider, ResponseStream};
// Get your ResponseStream with context
let response_stream: Pin<Box<ResponseStream<_>>> = get_stream_from_engine();
// Extract context BEFORE passing to apply
let context = response_stream.context();
// Apply jail transformation (ResponseStream implements Stream)
let jail = JailedStream::builder()
.tool_call_parser("nemotron_deci")
.build();
let jailed_stream = jail.apply(response_stream);
// Re-wrap with context when needed for engine consumption
let final_stream = ResponseStream::new(Box::pin(jailed_stream), context);
Advanced Configuration#
// With custom jail sequences
let jail = JailedStream::builder()
.jail_start_sequence("<TOOLCALL>")
.jail_end_sequence("</TOOLCALL>")
.tool_call_parser("nemotron_deci")
.build();
// With multiple sequences
let jail = JailedStream::builder()
.jail_start_sequences(vec!["<TOOLCALL>", "<FUNCTION>"])
.jail_end_sequences(vec!["</TOOLCALL>", "</FUNCTION>"])
.tool_call_parser("harmony")
.build();
How It Works#
Detection: When a jail start sequence (or tool call start) is detected, the stream enters “jail” mode
Accumulation: While jailed, tokens are accumulated in memory instead of being yielded
Annotations: Empty chunks with annotations are sent downstream for observability
Release: When a jail end sequence is detected OR the stream ends:
Accumulated content is parsed for tool calls
A single chunk with the parsed content is yielded
Pass-through: Non-jailed content passes through unchanged
Testing#
The implementation includes comprehensive tests:
test_jailed_stream_with_start_end_sequences: Tests explicit jail sequencestest_jailed_stream_with_tool_calls: Tests tool call detection and parsingtest_jailed_stream_no_jailing: Tests normal pass-through behavior
Run tests with:
cargo test -p dynamo-llm jail --lib
Benefits#
Standalone: No modifications to existing code required
Clean API: Builder pattern makes configuration intuitive
Flexible: Supports multiple jail detection strategies
Maintainable: Uses
stream!macro for cleaner async codeTestable: Comprehensive test suite with shared utilities
Efficient: No unnecessary boxing or context handling in the library
Composable: Can chain multiple stream transformers before re-adding context
Performance Optimizations#
No Boxing in Library: Returns
impl Streaminstead ofPin<Box<ResponseStream>>Stack Pinning: Uses
tokio::pin!()instead ofBox::pin()for better performanceNo Context Overhead: JailedStream doesn’t manage AsyncEngineContext
Lazy Evaluation: Only processes what’s needed
Efficient State Management: Minimal cloning, only when entering jail state
Integration Options#
To replace the existing apply_tool_calling_jail_internal function:
// In preprocessor.rs
pub fn apply_tool_calling_jail_with_parser(
&self,
stream: ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
) -> ManyOut<Annotated<NvCreateChatCompletionStreamResponse>> {
let jail = JailedStream::builder()
.tool_call_parser(self.tool_call_parser.clone())
.build();
jail.apply(stream)
}
Future Enhancements#
Add support for regex patterns for jail sequences
Add metrics/telemetry for jail detection
Support for partial sequence matching across chunk boundaries
Configurable accumulation limits
Support for nested jails