Building a Low Latency Speech-To-Speech RAG Bot#

In this section, we will build a replica of the RAG Sample Bot packaged as part of this release. The RAG sample bot utilizes the NVIDIA RAG Examples pipeline to answer questions based on the uploaded documents. The RAG sample bot showcases the following ACE Agent features:

Similar to the tutorial for LangChain-based Bots, we will use a Plugin to interact with the RAG server. This plugin can be connected to the Chat Controller or the Chat Engine. Then, we will utilize Colang to add an always-on barge-in support, improve end-to-end speech latency for bot responses, and add an option for long pause handling. Similar features can be added in the LangChain agents or custom RAG pipelines in similar ways to improve the user experience.

The minimal file structure of the RAG bot will looks like this:

samples
└── rag_tutorial_bot
    └── plugin
        └── rag.py
        └── schemas.py
    └── colang
        └── main.co
        └── speech.co
    └── plugin_config.yaml
    └── bot_config.yaml
    └── speech_config.yaml
    └── model_config.yaml

Create an empty directory called samples/rag_tutorial_bot and perform the following steps for updating bot configurations.

Creating the RAG Plugin#

In this section, let’s build the custom RAG plugin and test it as a standalone component.

  1. Deploy the RAG server by following the instructions in the RAG Sample Bot or the NVIDIA Generative AI examples repository.

  2. Define the input and output API schemas used by ACE Agent for communication with the Plugin server and Chat Controller. Copy samples/rag_bot/plugin/schemas.py to samples/rag_tutorial_bot/plugin/schemas.py.

    Let’s understand the APIs and their schemas a little. There are two APIs used by the Chat Controller microservice: /chat and /event.

    1. Events such as pipeline creation and deletion are sent by the Chat Controller to the /event endpoint. In most cases, you will not need to modify the default /event endpoint that we will define in the next step.

    2. User questions are sent by the Chat Controller to the /chat endpoint, along with the UserId and an optional QueryId. The response schema for this API contains a Response attribute, which contains the details of the response. In this tutorial, we only need to manage two sub-fields: Response.Text (which contains the chunk that we are streaming) and Response.IsFinal (which indicates whether the stream is complete).

  3. Create the actual custom Plugin with the RAG Server API call and the /chat and /event APIs. Copy samples/rag_bot/plugin/rag.py to samples/rag_tutorial_bot/plugin/rag.py.

    1. The rag_stream function takes in the query and additional request parameters and performs API calls to the RAG server, and returns the streaming response generator which formats the rag response into the ChatResponse schema, and pushes the chunk into the response stream. If you are using a custom RAG solution or if you want to modify the way you call the RAG, you will only need to make changes in the generator method.

      async def rag_stream(
          question: Optional[str] = "",
         chat_history: Optional[List] = [],
         num_tokens: Optional[int] = MAX_TOKENS,
      ) -> int:
          """
         Call the RAG chain server and return the streaming response.
         """
      
         request_json = {
             "messages": chat_history + [{"role": "user", "content": question}],
            "use_knowledge_base": True,
            "temperature": TEMPERATURE,
              "top_p": TOP_P,
              "max_tokens": num_tokens,
              "seed": 42,
              "bad": [],
           "stop": STOP_WORDS,
           "stream": True,
        }
      
        # Method that forwards the stream to the Chat controller
        async def generator():
      
              full_response = ""
              if question:
                  async with aiohttp.ClientSession() as session:
                      async with session.post(GENERATION_URL, json=request_json) as resp:
                          async for chunk, _ in resp.content.iter_chunks():
                              try:
                                  chunk = chunk.decode("utf-8")
                                  chunk = chunk.strip("\n")
      
                                  try:
                                      if len(chunk) > 6:
                                          parsed = json.loads(chunk[6:])
                                          message = parsed["choices"][0]["message"]["content"]
                                      else:
                                          logger.debug(f"Received empty RAG response chunk '{chunk}'.")
                                          message = ""
                                  except Exception as e:
                                      logger.warning(f"Parsing RAG response chunk '{chunk}' failed. {e}")
                                      message = ""
      
                                  if not message:
                                      continue
      
                                  full_response += message
      
                                  json_chunk = ChatResponse()
                                  json_chunk.Response.Text = message
                                  json_chunk.Response.CleanedText = message
                                  json_chunk = json.dumps(json_chunk.dict())
                                  yield json_chunk
                              except Exception as e:
                                  yield f"Internal error in RAG stream: {e}"
                                  break
      
              logger.info(f"Full RAG response for query `{question}` : {full_response}")
              json_chunk = ChatResponse()
              json_chunk.Response.IsFinal = True
              json_chunk = json.dumps(json_chunk.dict())
              yield json_chunk
      
          return StreamingResponse(generator(), media_type="text/event-stream")
      
    2. The /chat endpoint invokes the rag_stream with user query and chat history if available and forward the streaming RAG response.

      @router.post(
          "/chat",
          status_code=status.HTTP_200_OK,
      )
      async def chat(
          request: Annotated[
              ChatRequest,
              Body(
                  description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON."
              ),
          ],
          response: Response,
      ) -> StreamingResponse:
          """
          This endpoint can be used to provide response to query driven user request.
          """
      
          req = request.dict(exclude_none=True)
          logger.info(f"Received request JSON at /chat_stream endpoint for RAG : {json.dumps(req, indent=4)}")
      
          try:
              chat_history = []
              if "Metadata" in req:
                  chat_history = req["Metadata"].get("ChatHistory", [])
              resp = await rag_stream(question=req["Query"], chat_history=chat_history)
              return resp
          except Exception as e:
              response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
              return {"StatusMessage": str(e)}
      
    3. The /event endpoint is only invoked if the chat controller is directly connected via the Plugin Server Architecture and it is optional for our tutorial.

  4. Register this plugin. Add the following to plugin_config.yaml.

    config:
        workers: 1
        timeout: 30
    
    plugins:
        - name: rag
          path: ./plugin/rag.py
    
  5. Deploy the Plugin server for testing. Run the bot using the Docker Environment.

    export BOT_PATH=./samples/rag_tutorial_bot
    source deploy/docker/docker_init.sh
    docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
    
  6. Ingest documents as required for your use case by visiting http://<your-ip>:8090/kb. We can test the endpoint by asking queries relevant to your documents using the following curl command.

    curl -X 'POST' \
        'http://localhost:9002/rag/chat' \
        -H 'accept: application/json' \
        -H 'Content-Type: application/json' \
        -d '{
        "Query": "Who is the president of the United States?",
        "UserId": "user1"
    }'
    
  7. After you are done testing the plugin, stop the server.

    docker compose -f deploy/docker/docker-compose.yml down
    

Connecting Chat Controller to the RAG Plugin#

LangChain UCS App
  1. Copy the model_config.yaml and speech_config.yaml files from samples/chitchat_bot. They represent the common settings for a speech pipeline.

  2. Update the server URL in the dialog_manager component in the speech_config.yaml file to point to the pre-built RAG Plugin we defined in the previous step.

    dialog_manager:
      DialogManager:
          server: "http://localhost:9002/rag"
          use_streaming: true
    

    With this change, the Chat Controller will directly call the /chat and /event endpoints of the Plugin server.

  3. Deploy the bot.

    1. Set the environment variables required for the docker-compose.yml file.

      export BOT_PATH=./samples/rag_tutorial_bot/
      source deploy/docker/docker_init.sh
      
    2. For Plugin Server Architecture based bots, we need to use the speech_lite pipeline configuration for the Chat Controller microservice. Update the PIPELINE variable in deploy/docker/docker_init.sh or override it by setting the PIPELINE environment variable manually.

      export PIPELINE=speech_lite
      
    3. Deploy the Riva ASR and TTS speech models.

      docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
      
    4. Deploy the Plugin server with RAG plugin.

      docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
      
    5. Deploy the Chat Controller microservice with the gRPC interface.

      docker compose -f deploy/docker/docker-compose.yml up chat-controller -d
      
  4. Interact with the bot using the Speech WebUI application.

    docker compose -f deploy/docker/docker-compose.yml up bot-web-ui-client bot-web-ui-speech -d
    

    Notice that we are not deploying the Chat Engine container at all.

    You can interact with the bot using your browser at http://<YOUR_IP_ADDRESS>:7006.

Here’s an example snippet:

Speech Based Sample App

Connecting the Plugin to the Chat Engine#

You can add guardrails to the bot or add any custom logic in Colang by creating the configurations needed for the Chat Engine and connecting the Plugin server to the Chat Engine using the Event Interface Architecture or Chat Engine Server Architecture.

Speech Bot Web App UCS App

Creating the Bot and Colang Configurations#

bot_config.yaml is the configuration entry point for any bot. Let’s create this file and add a few important configuration parameters.

  1. Give the bot a name. In bot_config.yaml, you need to add a unique name for the bot. Let’s name the bot nvidia_rag_bot. We will use Colang2.0-beta syntax for this tutorial.

    bot: nvidia_rag_bot
    
    colang_version: "2.x"
    
    storage:
      name: cache
    
    configs:
      use_stateful_guardrails: True
    
  2. All the intelligence in our bot will be present in the RAG server. Since our Colang configs are only meant to route the queries to the plugin, let’s keep the model section empty.

    models: []
    
  3. Create a folder name colang and a Colang file called main.co, which will contain all the Colang logic. Let’s update main.co to route all queries to the RAG plugin.

    import core
    
    flow technical helper
    # Helper flows for notifying errors
    activate notification of undefined flow start "I have encountered some technical issue!"
    activate notification of colang errors "I have encountered some technical issue!"
    
    flow generate rag response $transcript
    # Invoke /chat endpoint from plugin
    $started =  await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True)
    if $started
        # Get first sentence from RAG response
        $response = await StreamingResponseChatAction(endpoint="rag/chat")
        while $response
        bot say $response
        # Check for next sentence
        $response = await StreamingResponseChatAction(endpoint="rag/chat")
    
    flow rag
    # Wait for user queries
    user said something as $ref
    # Generate RAG response when user query received
    generate rag response $ref.transcript
    
    flow main
    activate technical helper
    activate rag
    

    The above flow routes all user utterances to a POST endpoint called /rag/chat in a plugin server. It passes the user’s question as well as a session_id to the endpoint as request parameters.

    Note

    If you want to add more complicated logic in Colang, you must update main.co and possibly the bot config file according to your use case. Refer to Using Colang for more information.

Testing the Bot#

  1. Run the bot in gRPC Interface using the Docker Environment.

  2. Set the environment variables required for the docker-compose.yml file.

    export BOT_PATH=./samples/rag_tutorial_bot/
    source deploy/docker/docker_init.sh
    
  3. For Event Interface Architecture based bots, we need to use the speech_umim pipeline configuration for the Chat Controller microservice. Update the PIPELINE variable in deploy/docker/docker_init.sh or override it by setting the PIPELINE environment variable manually.

    export PIPELINE=speech_umim
    
  4. Deploy the Riva ASR and TTS speech models.

    docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
    
  5. Deploy the ACE Agent microservices. Deploy the Chat Controller, Chat Engine, Redis, Plugin server, and NLP server microservices. The NLP server will not have any models deployed for this bot.

    docker compose -f deploy/docker/docker-compose.yml up --build speech-event-bot -d
    
  6. Interact with the bot using the Sample WebUI application using your browser at http://<YOUR_IP_ADDRESS>:7006.

Improving the User Experience for Speech-To-Speech Conversations#

As humans, when we are talking, we take a pause during conversation, start thinking before others stop speaking and even interrupt during conversation. Most agents/bots today wait for the user to finish a query before processing the user query. Detecting the end of user speech is also subjective, as some people might take long pauses between words.

Riva Automatic Speech Recognition (ASR) pipeline waits, by default, 800 ms of silence in user audio for marking the end of the user speech. Therefore, user perceived latency will always be 800 ms + agent processing latency (RAG latency in this tutorial) at minimum. Apart from this, we will have ASR processing latency, Text-to-Speech Generation Latency, Network latency, and so on.

Riva ASR pipeline has multiple components. It might not be optimal to run a full pipeline for every audio chunk. The ASR pipeline will return a greedy decoded partial transcript for every user audio chunk after 800 ms. The final generated transcript will include beam LM decoding, punctuation and custom vocabulary. You can optionally configure it to return an interim transcript after some silence value less than 800 ms (240 ms, for example) with the same processing as the final transcript.

In this section, we will focus on the following user experience improvements:

  • Reduce 560 ms latency by using interim transcript with 240 ms silence as EOU instead of 800 ms for triggering RAG API

  • Always on Barge In support, basically stop the bot response when user interrupts

  • Option to configure the end of user speech threshold (currently 800 ms) for long pauses in user audio. Refer to Customizing ASR Recognition for Long Pause Handling for more information.

Updating Bot Configurations#

  1. Copy speech.co from RAG sample bot directory samples/rag_bot/. The speech.co implements Colang flows for utilizing partial/interim transcripts and supporting Always on Barge In. Follow the diagram below to understand the flows with interim transcript mode.

Interim Transcript with Barge In

Note

We will send an early trigger to the RAG server and might need to retrigger if the user takes more than 240 ms pause between words. On average, you might do two extra RAG calls for each user query which will require extra compute/cost for deploying on scale. If your use case has external API calls or any action which is non reversible or costly, you shouldn’t use the two pass EOU approach. You might need extra handling to ensure consequences for triggering pipeline multiple times for single user query.

  1. Update main.co to utilize flows from speech.co Colang file. The flow, handle user transcript with interruption listens to every partial, interim, and final transcript and decides to interrupt the bot response, setting the new active transcript and ignoring the spurious transcript. The flow, user partially said something returns when a new active transcript is set.

    import core
    
    flow technical helper
     activate notification of undefined flow start "I have encountered some technical issue!"
     activate notification of colang errors "I have encountered some technical issue!"
         # activate interim transcript mode and add list of flows to stop during user interruption / Barge In
     activate handle user transcript with interruption $mode="interim" $stop_flows_list=["_bot_say","generate rag response"]
    
    flow generate rag response $transcript
     $started =  await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True)
     if $started
        $response = await StreamingResponseChatAction(endpoint="rag/chat")
        log "response from RAG: {$response}"
        while $response
            bot say $response
            $response = await StreamingResponseChatAction(endpoint="rag/chat")
            log "response from RAG: {$response}"
    
    flow rag
     # wait for new active transcript
     user partially said something as $ref
     generate rag response $ref.transcript
    
    flow main
     activate technical helper
     activate rag
    

    We have added generate rag response in stop_flows_list to interrupt active RAG calls. Make sure any flows added in stop_flows_list are not using the activate keyword. In this example, we set the interim transcript mode, but you can even decide to work with partial transcripts, or only use final transcript.

  2. Riva ASR might detect a few spurious transcripts and that might interrupt the bot response unnecessarily. Create and update actions.py with spurious filter action.

    import logging
    
    from nemoguardrails.actions.actions import action
    
    logger = logging.getLogger("nemoguardrails")
    
    # Transcript filtering for spurious transcript and filler words. Along with this any transcript less than 3 chars is removed
    FILTER_WORDS = [
        "yeah",
        "okay",
        "right",
        "yes",
        "yum",
        "and",
        "one",
        "all",
        "when",
        "thank",
        "but",
        "next",
        "what",
        "i see",
        "the",
        "hmm",
        "mmm",
        "so that",
        "why",
        "that",
        "well",
    ]
    
    INCLUDE_WORDS = ["hi"]
    
    @action(name="IsSpuriousAction")
    async def is_spurious(query):
        """
        Filter transcript less than 3 chars or in FILTER_WORDS list to avoid spurious transcript and filler words.
        """
        if query.strip().lower() in FILTER_WORDS or (len(query) < 3 and query.strip().lower() not in INCLUDE_WORDS):
            return True
        else:
            return False
    
  3. Update the speech_config.yaml parameters to send an interim transcript with 240 ms user silence. Refer to the Customizing ASR Recognition for Long Pause Handling section for more information. Update the riva_asr component config as shown below.

    riva_asr:
        RivaASR:
            server: "localhost:50051"
            # Update interim and final transcript silence threshold
            endpointing_stop_history: 800 # Second pass End of User Speech
            endpointing_stop_history_eou: 240 # First pass End of User Speech
    
  4. With a higher value for endpointing_stop_history, we can avoid breaking user query into multiple queries for long pauses in user audio. It is recommended to use a value of 800 ms at minimum to avoid ASR transcript quality issues.

Testing the Bot#

  1. Run the bot in gRPC Interface using the Docker Environment.

  2. Set the environment variables required for the docker-compose.yml file.

    export BOT_PATH=./samples/rag_tutorial_bot/
    source deploy/docker/docker_init.sh
    
  3. For Event Interface Architecture based bots, we need to use the speech_umim pipeline configuration for the Chat Controller microservice. Update the PIPELINE variable in deploy/docker/docker_init.sh or override it by setting the PIPELINE environment variable manually.

    export PIPELINE=speech_umim
    
  4. Deploy the Riva ASR and TTS speech models.

    docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
    
  5. Deploy the ACE Agent microservices. Deploy the Chat Controller, Chat Engine, Redis, Plugin server, and NLP server microservices. The NLP server will not have any models deployed for this bot.

    docker compose -f deploy/docker/docker-compose.yml up --build speech-event-bot -d
    
  6. Interact with the bot using the Sample WebUI application using your browser at http://<YOUR_IP_ADDRESS>:7006. You should be able to observe on average 500-600 ms improvements in latency and can barge-in anytime to stop the current bot response.