Additional Resources

Jail Stream

View as Markdown

Overview

The JailedStream is a standalone implementation for handling “jail” detection in token streams. It provides a clean, builder-based API for accumulating tokens when certain sequences are detected, then releasing them as a single chunk when the jail ends.

Key Features

  • Builder Pattern: Clean configuration API using the builder pattern
  • Configurable Sequences: Support for multiple start/end jail sequences
  • Tool Call Parsing: Integrated tool call detection and parsing
  • Stream Macro: Uses async-stream::stream! for clean async implementation
  • Standalone: Completely independent of existing code
  • Annotations: Preserves annotations for observability

Implementation

Location

  • Main implementation: lib/llm/src/protocols/openai/chat_completions/jail.rs
  • Examples: lib/llm/src/protocols/openai/chat_completions/jail_example.rs

Usage

1use crate::protocols::openai::chat_completions::jail::JailedStream;
2use dynamo_runtime::engine::{AsyncEngineContextProvider, ResponseStream};
3
4// Get your ResponseStream with context
5let response_stream: Pin<Box<ResponseStream<_>>> = get_stream_from_engine();
6
7// Extract context BEFORE passing to apply
8let context = response_stream.context();
9
10// Apply jail transformation (ResponseStream implements Stream)
11let jail = JailedStream::builder()
12 .tool_call_parser("nemotron_deci")
13 .build();
14
15let jailed_stream = jail.apply(response_stream);
16
17// Re-wrap with context when needed for engine consumption
18let final_stream = ResponseStream::new(Box::pin(jailed_stream), context);

Advanced Configuration

1// With custom jail sequences
2let jail = JailedStream::builder()
3 .jail_start_sequence("<TOOLCALL>")
4 .jail_end_sequence("</TOOLCALL>")
5 .tool_call_parser("nemotron_deci")
6 .build();
7
8// With multiple sequences
9let jail = JailedStream::builder()
10 .jail_start_sequences(vec!["<TOOLCALL>", "<FUNCTION>"])
11 .jail_end_sequences(vec!["</TOOLCALL>", "</FUNCTION>"])
12 .tool_call_parser("harmony")
13 .build();

How It Works

  1. Detection: When a jail start sequence (or tool call start) is detected, the stream enters “jail” mode
  2. Accumulation: While jailed, tokens are accumulated in memory instead of being yielded
  3. Annotations: Empty chunks with annotations are sent downstream for observability
  4. Release: When a jail end sequence is detected OR the stream ends:
    • Accumulated content is parsed for tool calls
    • A single chunk with the parsed content is yielded
  5. Pass-through: Non-jailed content passes through unchanged

Testing

The implementation includes comprehensive tests:

  • test_jailed_stream_with_start_end_sequences: Tests explicit jail sequences
  • test_jailed_stream_with_tool_calls: Tests tool call detection and parsing
  • test_jailed_stream_no_jailing: Tests normal pass-through behavior

Run tests with:

$cargo test -p dynamo-llm jail --lib

Benefits

  1. Standalone: No modifications to existing code required
  2. Clean API: Builder pattern makes configuration intuitive
  3. Flexible: Supports multiple jail detection strategies
  4. Maintainable: Uses stream! macro for cleaner async code
  5. Testable: Comprehensive test suite with shared utilities
  6. Efficient: No unnecessary boxing or context handling in the library
  7. Composable: Can chain multiple stream transformers before re-adding context

Performance Optimizations

  • No Boxing in Library: Returns impl Stream instead of Pin<Box<ResponseStream>>
  • Stack Pinning: Uses tokio::pin!() instead of Box::pin() for better performance
  • No Context Overhead: JailedStream doesn’t manage AsyncEngineContext
  • Lazy Evaluation: Only processes what’s needed
  • Efficient State Management: Minimal cloning, only when entering jail state

Integration Options

To replace the existing apply_tool_calling_jail_internal function:

1// In preprocessor.rs
2pub fn apply_tool_calling_jail_with_parser(
3 &self,
4 stream: ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
5) -> ManyOut<Annotated<NvCreateChatCompletionStreamResponse>> {
6 let jail = JailedStream::builder()
7 .tool_call_parser(self.tool_call_parser.clone())
8 .build();
9
10 jail.apply(stream)
11}

Future Enhancements

  • Add support for regex patterns for jail sequences
  • Add metrics/telemetry for jail detection
  • Support for partial sequence matching across chunk boundaries
  • Configurable accumulation limits
  • Support for nested jails