Working with Streaming Output#

The microservice supports streaming output for both chat completions and completions.

Configuration#

You must enable streaming support in your guardrails configuration. The key fields are streaming.enabled set to True, and the chunk_size, context_size, and stream_first configurable fields.

rails:
  output:
    flows:
      - ...
    streaming:
      enabled: True
      chunk_size: 200
      context_size: 50
      stream_first: True
"rails": {
  "output": {
    "flows": [
       "..."
    ],
    "streaming": {
      "enabled": "True",
      "chunk_size": 200,
      "context_size": 50,
      "stream_first": "True"
    }
  }
}

For information about the fields, refer to streaming output configuration in the NeMo Guardrails Toolkit documentation.

For information about managing guardrails configuration, refer to the demonstration guardrail configuration, demo-self-check-output, from create a guardrail configuration.

Performance Comparison#

The primary purpose of streaming output is to reduce the time-to-first-token (TTFT) of the LLM response. The following table shows some timing results for 20 requests using the GPT 4o model from OpenAI with and without streaming output and a very basic timing script.

Configuration

Mean TTFT

Median TTFT

Stdev

Min TTFT

Max TTFT

Streaming enabled

0.5475

0.5208

0.1248

0.4241

0.9287

Streaming disabled

3.6834

3.6127

1.6949

0.4487

7.4227

The streaming enabled configuration is faster by 85.14%, on average, and has more consistent performance, as shown by the lower standard deviation, 0.1248 versus 1.6949.

The streaming configuration used the default configuration values: chunk_size: 200, context_size: 50, and stream_first: True.

Detecting Blocked Content in Streaming Output#

The microservice applies guardrail checks on chunks of tokens as they are streamed from the LLM. If a chunk of tokens is blocked, the microservice returns a response in the following format:

{"error":{"message":"Blocked by <rail-name>","type":"guardrails_violation","param":"<rails-name>","code":"content_blocked"}}

Example Output

{
  "error": {
    "message": "Blocked by self check output rails.",
    "type": "guardrails_violation",
    "param": "self check output",
    "code": "content_blocked"
  }
}

Chat Completions with Streaming#

  • Perform a POST request to the /v1/guardrail/chat/completions endpoint and specify "stream": True in the request body.

    import os
    import json
    import requests
    
    url = f"{os.environ['GUARDRAILS_BASE_URL']}/v1/guardrail/chat/completions"
    
    headers = {"Accept": "application/json", "Content-Type": "application/json"}
    
    data = {
        "model": "meta/llama-3.1-8b-instruct",
        "messages": [{
            "role": "user",
            "content": "Tell me about Cape Hatteras National Seashore in 50 words or less."
        }],
        "guardrails": {
            "config_id": "demo-self-check-input-output",
        },
        "top_p": 1,
        "max_tokens": 200,
        "stream": True
    }
    
    with requests.post(url, headers=headers, json=data, stream=True) as response:
        response.encoding = "utf-8"
        for chunk in response.iter_lines(decode_unicode=True):
            if not chunk:
                continue
            try:
                data = json.loads(chunk[len("data: "):])
            except json.JSONDecodeError:
                continue
            if data.get("choices", [{}])[0].get("delta", {}).get("content", None) is not None:
                # Add a check if content includes {"error": {"message": "Blocked by <rail-name>"...
                print(data["choices"][0]["delta"]["content"], end="", flush=True)
    
    import os
    import json
    
    from openai import OpenAI
    
    x_model_authorization = {"X-Model-Authorization": os.environ["NVIDIA_API_KEY"]}
    
    url = f"{os.environ['GUARDRAILS_BASE_URL']}/v1/guardrail"
    
    # The api_key argument is required, but is specified in the default_headers argument.
    client = OpenAI(
        base_url=url,
        api_key="dummy-value",
        default_headers=x_model_authorization,
    )
    
    stream = client.chat.completions.create(
        model = "meta/llama-3.1-8b-instruct",
        messages = [
            {
                "role": "user",
                "content": "Tell me about Cape Hatteras National Seashore in 50 words or less."
            }
        ],
        extra_body = {
            "guardrails": {
                "config_id": "demo-self-check-input-output"
            },
        },
        max_tokens=200,
        stream=True
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content is not None:
            # Add a check if content includes {"error": {"message": "Blocked by <rail-name>"...
            print(chunk.choices[0].delta.content, end="", flush=True)
    
    import os
    import json
    
    from langchain_openai import ChatOpenAI
    
    x_model_authorization = {"X-Model-Authorization": os.environ["NVIDIA_API_KEY"]}
    
    model = ChatOpenAI(
        model_name = "meta/llama-3.1-8b-instruct",
        openai_api_base = f"{os.environ['GUARDRAILS_BASE_URL']}/v1/guardrail",
        api_key = "dummy-value",
        default_headers = x_model_authorization,
        extra_body = {
            "guardrails": {
                "config_id": "demo-self-check-input-output"
            }
        },
        max_tokens=200
    )
    
    for chunk in model.stream("Tell me about Cape Hatteras National Seashore in 50 words or less."):
        print(chunk)
        print(chunk.content, end="", flush=True)