Building a Low Latency Speech-To-Speech RAG Bot#
In this section, we will build a replica of the RAG Sample Bot packaged as part of this release. The RAG sample bot utilizes the NVIDIA RAG Examples pipeline to answer questions based on the uploaded documents. The RAG sample bot showcases the following ACE Agent features:
Integrating a RAG example from NVIDIA’s Generative AI Examples
Low latency using ASR 2 pass End of Utterance (EOU)
Always-on Barge-In support
Handling conversation history with plugins
Support deployment using Event Architecture
Similar to the tutorial for LangChain-based Bots, we will use a Plugin to interact with the RAG server. This plugin can be connected to the Chat Controller or the Chat Engine. Then, we will utilize Colang to add an always-on barge-in support, improve end-to-end speech latency for bot responses, and add an option for long pause handling. Similar features can be added in the LangChain agents or custom RAG pipelines in similar ways to improve the user experience.
The minimal file structure of the RAG bot will looks like this:
samples └── rag_tutorial_bot └── plugin └── rag.py └── schemas.py └── colang └── main.co └── speech.co └── plugin_config.yaml └── bot_config.yaml └── speech_config.yaml └── model_config.yaml
Create an empty directory called samples/rag_tutorial_bot
and perform the following steps for updating bot configurations.
Creating the RAG Plugin#
In this section, let’s build the custom RAG plugin and test it as a standalone component.
Deploy the RAG server by following the instructions in the RAG Sample Bot or the NVIDIA Generative AI examples repository.
Define the input and output API schemas used by ACE Agent for communication with the Plugin server and Chat Controller. Copy
samples/rag_bot/plugin/schemas.py
tosamples/rag_tutorial_bot/plugin/schemas.py
.Let’s understand the APIs and their schemas a little. There are two APIs used by the Chat Controller microservice:
/chat
and/event
.Events such as pipeline creation and deletion are sent by the Chat Controller to the
/event
endpoint. In most cases, you will not need to modify the default/event
endpoint that we will define in the next step.User questions are sent by the Chat Controller to the
/chat
endpoint, along with theUserId
and an optionalQueryId
. The response schema for this API contains aResponse
attribute, which contains the details of the response. In this tutorial, we only need to manage two sub-fields:Response.Text
(which contains the chunk that we are streaming) andResponse.IsFinal
(which indicates whether the stream is complete).
Create the actual custom Plugin with the RAG Server API call and the
/chat
and/event
APIs. Copysamples/rag_bot/plugin/rag.py
tosamples/rag_tutorial_bot/plugin/rag.py
.The
rag_stream
function takes in the query and additional request parameters and performs API calls to the RAG server, and returns the streaming response generator which formats the rag response into theChatResponse
schema, and pushes the chunk into the response stream. If you are using a custom RAG solution or if you want to modify the way you call the RAG, you will only need to make changes in thegenerator
method.async def rag_stream( question: Optional[str] = "", chat_history: Optional[List] = [], num_tokens: Optional[int] = MAX_TOKENS, ) -> int: """ Call the RAG chain server and return the streaming response. """ request_json = { "messages": chat_history + [{"role": "user", "content": question}], "use_knowledge_base": True, "temperature": TEMPERATURE, "top_p": TOP_P, "max_tokens": num_tokens, "seed": 42, "bad": [], "stop": STOP_WORDS, "stream": True, } # Method that forwards the stream to the Chat controller async def generator(): full_response = "" if question: async with aiohttp.ClientSession() as session: async with session.post(GENERATION_URL, json=request_json) as resp: async for chunk, _ in resp.content.iter_chunks(): try: chunk = chunk.decode("utf-8") chunk = chunk.strip("\n") try: if len(chunk) > 6: parsed = json.loads(chunk[6:]) message = parsed["choices"][0]["message"]["content"] else: logger.debug(f"Received empty RAG response chunk '{chunk}'.") message = "" except Exception as e: logger.warning(f"Parsing RAG response chunk '{chunk}' failed. {e}") message = "" if not message: continue full_response += message json_chunk = ChatResponse() json_chunk.Response.Text = message json_chunk.Response.CleanedText = message json_chunk = json.dumps(json_chunk.dict()) yield json_chunk except Exception as e: yield f"Internal error in RAG stream: {e}" break logger.info(f"Full RAG response for query `{question}` : {full_response}") json_chunk = ChatResponse() json_chunk.Response.IsFinal = True json_chunk = json.dumps(json_chunk.dict()) yield json_chunk return StreamingResponse(generator(), media_type="text/event-stream")
The
/chat
endpoint invokes therag_stream
with user query and chat history if available and forward the streaming RAG response.@router.post( "/chat", status_code=status.HTTP_200_OK, ) async def chat( request: Annotated[ ChatRequest, Body( description="Chat Engine Request JSON. All the fields populated as part of this JSON is also available as part of request JSON." ), ], response: Response, ) -> StreamingResponse: """ This endpoint can be used to provide response to query driven user request. """ req = request.dict(exclude_none=True) logger.info(f"Received request JSON at /chat_stream endpoint for RAG : {json.dumps(req, indent=4)}") try: chat_history = [] if "Metadata" in req: chat_history = req["Metadata"].get("ChatHistory", []) resp = await rag_stream(question=req["Query"], chat_history=chat_history) return resp except Exception as e: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR return {"StatusMessage": str(e)}
The
/event
endpoint is only invoked if the chat controller is directly connected via the Plugin Server Architecture and it is optional for our tutorial.
Register this plugin. Add the following to
plugin_config.yaml
.config: workers: 1 timeout: 30 plugins: - name: rag path: ./plugin/rag.py
Deploy the Plugin server for testing. Run the bot using the Docker Environment.
export BOT_PATH=./samples/rag_tutorial_bot source deploy/docker/docker_init.sh docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
Ingest documents as required for your use case by visiting
http://<your-ip>:8090/kb
. We can test the endpoint by asking queries relevant to your documents using the followingcurl
command.curl -X 'POST' \ 'http://localhost:9002/rag/chat' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "Query": "Who is the president of the United States?", "UserId": "user1" }'
After you are done testing the plugin, stop the server.
docker compose -f deploy/docker/docker-compose.yml down
Connecting Chat Controller to the RAG Plugin#
Copy the
model_config.yaml
andspeech_config.yaml
files fromsamples/chitchat_bot
. They represent the common settings for a speech pipeline.Update the
server
URL in thedialog_manager
component in thespeech_config.yaml
file to point to the pre-built RAG Plugin we defined in the previous step.dialog_manager: DialogManager: server: "http://localhost:9002/rag" use_streaming: true
With this change, the Chat Controller will directly call the
/chat
and/event
endpoints of the Plugin server.Deploy the bot.
Set the environment variables required for the
docker-compose.yml
file.export BOT_PATH=./samples/rag_tutorial_bot/ source deploy/docker/docker_init.sh
For Plugin Server Architecture based bots, we need to use the
speech_lite
pipeline configuration for the Chat Controller microservice. Update thePIPELINE
variable indeploy/docker/docker_init.sh
or override it by setting thePIPELINE
environment variable manually.export PIPELINE=speech_lite
Deploy the Riva ASR and TTS speech models.
docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
Deploy the Plugin server with RAG plugin.
docker compose -f deploy/docker/docker-compose.yml up --build plugin-server -d
Deploy the Chat Controller microservice with the gRPC interface.
docker compose -f deploy/docker/docker-compose.yml up chat-controller -d
Interact with the bot using the Speech WebUI application.
docker compose -f deploy/docker/docker-compose.yml up bot-web-ui-client bot-web-ui-speech -d
Notice that we are not deploying the Chat Engine container at all.
You can interact with the bot using your browser at
http://<YOUR_IP_ADDRESS>:7006
.
Here’s an example snippet:
Connecting the Plugin to the Chat Engine#
You can add guardrails to the bot or add any custom logic in Colang by creating the configurations needed for the Chat Engine and connecting the Plugin server to the Chat Engine using the Event Interface Architecture or Chat Engine Server Architecture.
Creating the Bot and Colang Configurations#
bot_config.yaml
is the configuration entry point for any bot. Let’s create this file and add a few important configuration parameters.
Give the bot a name. In
bot_config.yaml
, you need to add a unique name for the bot. Let’s name the botnvidia_rag_bot
. We will use Colang2.0-beta syntax for this tutorial.bot: nvidia_rag_bot colang_version: "2.x" storage: name: cache configs: use_stateful_guardrails: True
All the intelligence in our bot will be present in the RAG server. Since our Colang configs are only meant to route the queries to the plugin, let’s keep the model section empty.
models: []
Create a folder name
colang
and a Colang file calledmain.co
, which will contain all the Colang logic. Let’s updatemain.co
to route all queries to the RAG plugin.import core flow technical helper # Helper flows for notifying errors activate notification of undefined flow start "I have encountered some technical issue!" activate notification of colang errors "I have encountered some technical issue!" flow generate rag response $transcript # Invoke /chat endpoint from plugin $started = await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True) if $started # Get first sentence from RAG response $response = await StreamingResponseChatAction(endpoint="rag/chat") while $response bot say $response # Check for next sentence $response = await StreamingResponseChatAction(endpoint="rag/chat") flow rag # Wait for user queries user said something as $ref # Generate RAG response when user query received generate rag response $ref.transcript flow main activate technical helper activate rag
The above flow routes all user utterances to a POST endpoint called
/rag/chat
in a plugin server. It passes the user’s question as well as asession_id
to the endpoint as request parameters.Note
If you want to add more complicated logic in Colang, you must update
main.co
and possibly the bot config file according to your use case. Refer to Using Colang for more information.
Testing the Bot#
Run the bot in gRPC Interface using the Docker Environment.
Set the environment variables required for the
docker-compose.yml
file.export BOT_PATH=./samples/rag_tutorial_bot/ source deploy/docker/docker_init.sh
For Event Interface Architecture based bots, we need to use the
speech_umim
pipeline configuration for the Chat Controller microservice. Update thePIPELINE
variable indeploy/docker/docker_init.sh
or override it by setting thePIPELINE
environment variable manually.export PIPELINE=speech_umim
Deploy the Riva ASR and TTS speech models.
docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
Deploy the ACE Agent microservices. Deploy the Chat Controller, Chat Engine, Redis, Plugin server, and NLP server microservices. The NLP server will not have any models deployed for this bot.
docker compose -f deploy/docker/docker-compose.yml up --build speech-event-bot -d
Interact with the bot using the Sample WebUI application using your browser at
http://<YOUR_IP_ADDRESS>:7006
.
Improving the User Experience for Speech-To-Speech Conversations#
As humans, when we are talking, we take a pause during conversation, start thinking before others stop speaking and even interrupt during conversation. Most agents/bots today wait for the user to finish a query before processing the user query. Detecting the end of user speech is also subjective, as some people might take long pauses between words.
Riva Automatic Speech Recognition (ASR) pipeline waits, by default, 800 ms of silence in user audio for marking the end of the user speech. Therefore, user perceived latency will always be 800 ms + agent processing latency (RAG latency in this tutorial) at minimum. Apart from this, we will have ASR processing latency, Text-to-Speech Generation Latency, Network latency, and so on.
Riva ASR pipeline has multiple components. It might not be optimal to run a full pipeline for every audio chunk. The ASR pipeline will return a greedy decoded partial transcript for every user audio chunk after 800 ms. The final generated transcript will include beam LM decoding, punctuation and custom vocabulary. You can optionally configure it to return an interim transcript after some silence value less than 800 ms (240 ms, for example) with the same processing as the final transcript.
In this section, we will focus on the following user experience improvements:
Reduce 560 ms latency by using interim transcript with 240 ms silence as EOU instead of 800 ms for triggering RAG API
Always on Barge In support, basically stop the bot response when user interrupts
Option to configure the end of user speech threshold (currently 800 ms) for long pauses in user audio. Refer to Customizing ASR Recognition for Long Pause Handling for more information.
Updating Bot Configurations#
Copy
speech.co
from RAG sample bot directorysamples/rag_bot/
. Thespeech.co
implements Colang flows for utilizing partial/interim transcripts and supporting Always on Barge In. Follow the diagram below to understand the flows with interim transcript mode.
Note
We will send an early trigger to the RAG server and might need to retrigger if the user takes more than 240 ms pause between words. On average, you might do two extra RAG calls for each user query which will require extra compute/cost for deploying on scale. If your use case has external API calls or any action which is non reversible or costly, you shouldn’t use the two pass EOU approach. You might need extra handling to ensure consequences for triggering pipeline multiple times for single user query.
Update
main.co
to utilize flows fromspeech.co
Colang file. The flow,handle user transcript with interruption
listens to every partial, interim, and final transcript and decides to interrupt the bot response, setting the new active transcript and ignoring the spurious transcript. The flow,user partially said something
returns when a new active transcript is set.import core flow technical helper activate notification of undefined flow start "I have encountered some technical issue!" activate notification of colang errors "I have encountered some technical issue!" # activate interim transcript mode and add list of flows to stop during user interruption / Barge In activate handle user transcript with interruption $mode="interim" $stop_flows_list=["_bot_say","generate rag response"] flow generate rag response $transcript $started = await InvokeStreamingChatAction(question=$transcript,endpoint="rag/chat",chat_history=True) if $started $response = await StreamingResponseChatAction(endpoint="rag/chat") log "response from RAG: {$response}" while $response bot say $response $response = await StreamingResponseChatAction(endpoint="rag/chat") log "response from RAG: {$response}" flow rag # wait for new active transcript user partially said something as $ref generate rag response $ref.transcript flow main activate technical helper activate rag
We have added
generate rag response
instop_flows_list
to interrupt active RAG calls. Make sure any flows added instop_flows_list
are not using the activate keyword. In this example, we set the interim transcript mode, but you can even decide to work with partial transcripts, or only use final transcript.Riva ASR might detect a few spurious transcripts and that might interrupt the bot response unnecessarily. Create and update
actions.py
with spurious filter action.import logging from nemoguardrails.actions.actions import action logger = logging.getLogger("nemoguardrails") # Transcript filtering for spurious transcript and filler words. Along with this any transcript less than 3 chars is removed FILTER_WORDS = [ "yeah", "okay", "right", "yes", "yum", "and", "one", "all", "when", "thank", "but", "next", "what", "i see", "the", "hmm", "mmm", "so that", "why", "that", "well", ] INCLUDE_WORDS = ["hi"] @action(name="IsSpuriousAction") async def is_spurious(query): """ Filter transcript less than 3 chars or in FILTER_WORDS list to avoid spurious transcript and filler words. """ if query.strip().lower() in FILTER_WORDS or (len(query) < 3 and query.strip().lower() not in INCLUDE_WORDS): return True else: return False
Update the
speech_config.yaml
parameters to send an interim transcript with 240 ms user silence. Refer to the Customizing ASR Recognition for Long Pause Handling section for more information. Update theriva_asr
component config as shown below.riva_asr: RivaASR: server: "localhost:50051" # Update interim and final transcript silence threshold endpointing_stop_history: 800 # Second pass End of User Speech endpointing_stop_history_eou: 240 # First pass End of User Speech
With a higher value for
endpointing_stop_history
, we can avoid breaking user query into multiple queries for long pauses in user audio. It is recommended to use a value of 800 ms at minimum to avoid ASR transcript quality issues.
Testing the Bot#
Run the bot in gRPC Interface using the Docker Environment.
Set the environment variables required for the
docker-compose.yml
file.export BOT_PATH=./samples/rag_tutorial_bot/ source deploy/docker/docker_init.sh
For Event Interface Architecture based bots, we need to use the
speech_umim
pipeline configuration for the Chat Controller microservice. Update thePIPELINE
variable indeploy/docker/docker_init.sh
or override it by setting thePIPELINE
environment variable manually.export PIPELINE=speech_umim
Deploy the Riva ASR and TTS speech models.
docker compose -f deploy/docker/docker-compose.yml up model-utils-speech
Deploy the ACE Agent microservices. Deploy the Chat Controller, Chat Engine, Redis, Plugin server, and NLP server microservices. The NLP server will not have any models deployed for this bot.
docker compose -f deploy/docker/docker-compose.yml up --build speech-event-bot -d
Interact with the bot using the Sample WebUI application using your browser at
http://<YOUR_IP_ADDRESS>:7006
. You should be able to observe on average 500-600 ms improvements in latency and can barge-in anytime to stop the current bot response.