Morpheus LLM Agents Pipeline

Purpose

The Morpheus LLM Agents pipeline is designed to seamlessly integrate Large Language Model (LLM) agents into the Morpheus framework. This implementation focuses on efficiently executing multiple LLM queries using the ReAct agent type, which is tailored for versatile task handling. The use of the Langchain library streamlines the process, minimizing the need for additional system migration.

Within the Morpheus LLM Agents context, these agents act as intermediaries, facilitating communication between users and the LLM service. Their primary role is to execute tools and manage multiple LLM queries, enhancing the LLM’s capabilities in solving complex tasks. Agents utilize various tools, such as internet searches, VDB retrievers, calculators, and more, to assist in resolving inquiries, enabling seamless execution of tasks and efficient handling of diverse queries.

LLM Service

This pipeline supports various LLM services compatible with our LLMService interface, including OpenAI, NeMo, or local execution using llama-cpp-python. In this example, we’ll focus on using OpenAI, chosen for its compatibility with the ReAct agent architecture.

Agent type

The pipeline supports different agent types, each influencing the pattern for interacting with the LLM. For this example, we’ll use the ReAct agent type—a popular and reliable choice.

Agent tools

Depending on the problem at hand, various tools can be provided to LLM agents, such as internet searches, VDB retrievers, calculators, Wikipedia, etc. In this example, we’ll use the internet search tool and an llm-math tool, allowing the LLM agent to perform Google searches and solve math equations.

LLM Library

The pipeline utilizes the Langchain library to run LLM agents, enabling their execution directly within a Morpheus pipeline. This approach reduces the overhead of migrating existing systems to Morpheus and eliminates the need to replicate work done by popular LLM libraries like llama-index and Haystack.

Pipeline Implementation

  • InMemorySourceStage: Manages LLM queries in a DataFrame.

  • KafkaSourceStage: Consumes LLM queries from the Kafka topic.

  • DeserializationStage: Converts MessageMeta objects into ControlMessages required by the LLMEngine.

  • LLMEngineStage: Encompasses the core LLMEngine functionality.

    • An ExtracterNode extracts the questions from the DataFrame.

    • A LangChainAgentNode runs the Langchain agent executor for all provided input. This node will utilize the agents run interface to run the agents asynchronously.

    • Finally, the responses are incorporated back into the ControlMessage using a SimpleTaskHandler.

  • InMemorySinkStage: Store the results.

Getting Started

Prerequisites

Set Environment Variables

Before running the project, ensure that you set the required environment variables. Follow the steps below to obtain and set the API keys for OpenAI and SerpApi.

OpenAI API Key

Visit OpenAI and create an account. Navigate to your account settings to obtain your OpenAI API key. Copy the key and set it as an environment variable using the following command:

Copy
Copied!
            

export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

SerpApi API Key

Go to SerpApi to register and create an account. Once registered, obtain your SerpApi API key. Set the API key as an environment variable using the following command:

Copy
Copied!
            

export SERPAPI_API_KEY="<YOUR_SERPAPI_API_KEY>"

Install Dependencies

Install the required dependencies.

Copy
Copied!
            

mamba env update \ -n ${CONDA_DEFAULT_ENV} \ --file ./conda/environments/examples_cuda-121_arch-x86_64.yaml

Running the Morpheus Pipeline

The top level entrypoint to each of the LLM example pipelines is examples/llm/main.py. This script accepts a set of Options and a Pipeline to run. Baseline options are below, and for the purposes of this document we’ll assume a pipeline option of agents:

Run example (Simple Pipeline):

This example demonstrates the basic implementation of Morpheus pipeline, showcasing the process of executing LLM queries and managing the generated responses. It uses different stages such as InMemorySourceStage, DeserializationStage, ExtracterNode, LangChainAgentNode, SimpleTaskHandler, and InMemorySinkStage within the pipeline to handle various aspects of query processing and response management.

  • Utilizes stages such as InMemorySourceStage and DeserializationStage for consuming and batching LLM queries.

  • Incorporates an ExtracterNode for extracting questions and a LangChainAgentNode for executing the Langchain agent executor.

  • SimpleTaskHandler to manage the responses generated by the LLMs.

  • Stores and manages the results within the pipeline using an InMemorySinkStage.

Copy
Copied!
            

python examples/llm/main.py agents simple [OPTIONS]

Options:

  • --num_threads INTEGER RANGE

    • Description: Number of internal pipeline threads to use.

    • Default: 12

  • --pipeline_batch_size INTEGER RANGE

    • Description: Internal batch size for the pipeline. Can be much larger than the model batch size. Also used for Kafka consumers.

    • Default: 1024

  • --model_max_batch_size INTEGER RANGE

    • Description: Max batch size to use for the model.

    • Default: 64

  • --model_name TEXT

    • Description: The name of the model to use in OpenAI.

    • Default: gpt-3.5-turbo-instruct

  • --repeat_count INTEGER RANGE

    • Description: Number of times to repeat the input query. Useful for testing performance.

    • Default: 1

  • --help

    • Description: Show the help message with options and commands details.

Run example (Kafka Pipeline):

The Kafka Example in the Morpheus LLM Agents demonstrates an streaming implementation, utilizing Kafka messages to facilitate the near real-time processing of LLM queries. This example is similar to the Simple example but makes use of a KafkaSourceStage to stream and retrieve messages from the Kafka topic

First, to run the Kafka example, you need to create a Kafka cluster that enables the persistent pipeline to accept queries for the LLM agents. You can create the Kafka cluster using the following guide: Quick Launch Kafka Cluster Guide

Once the Kafka cluster is running, create Kafka topic to produce input to the pipeline.

Copy
Copied!
            

# Set the bootstrap server variable export BOOTSTRAP_SERVER=$(broker-list.sh) # Create the input and output topics kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --create --topic input # Update the partitions kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --topic input --partitions 3

Now Kafka example can be run using the following command with the below listed options:

Copy
Copied!
            

python examples/llm/main.py agents kafka [OPTIONS]

Options:

  • --num_threads INTEGER RANGE

    • Description: Number of internal pipeline threads to use.

    • Default: 12

  • --pipeline_batch_size INTEGER RANGE

    • Description: Internal batch size for the pipeline. Can be much larger than the model batch size. Also used for Kafka consumers.

    • Default: 1024

  • --model_max_batch_size INTEGER RANGE

    • Description: Max batch size to use for the model.

    • Default: 64

  • --model_name TEXT

    • Description: The name of the model to use in OpenAI.

    • Default: gpt-3.5-turbo-instruct

  • --help

    • Description: Show the help message with options and commands details.

After the pipeline is running, we need to send messages to the pipeline using the Kafka topic. In a separate terminal, run the following command:

Copy
Copied!
            

kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic input

This will open up a prompt allowing any JSON to be pasted into the terminal. The JSON should be formatted as follows:

Copy
Copied!
            

{"question": "<Your question here>"}

For example:

Copy
Copied!
            

{"question": "Who is Leo DiCaprio's girlfriend? What is her current age raised to the 0.43 power?"} {"question": "What is the height of the tallest mountain in feet divided by 2.23? Do not round your answer"} {"question": "Who is the current leader of Japan? What is the largest prime number that is smaller that their age? Just say the number."}

Previous LLM RAG
Next Example cyBERT Morpheus Pipeline for Apache Log Parsing
© Copyright 2024, NVIDIA. Last updated on Apr 25, 2024.