Build a Complete OpenSource LLM RAG QA Chatbot — ChatEngine

Marco Bertelli
Level Up Coding
Published in
10 min readDec 28, 2023

--

ChatEngine

Previous Articles:

In this new episode, we will commence by utilizing the code created in our previous articles. Our focus will be on implementing a feature that allows for saving chats to MongoDb and constructing our ChatEngine. This engine aims to provide users with a comprehensive chat experience, enabling functionalities such as allowing users to respond to our ChatBot, for instance, by asking, ‘Are you sure about your last response?’

Excitingly, we begin with some great news: Perplexity now officially supports the new mixtral-8x7b-instruct model, which is astonishing! In terms of performance, it surpasses the GPT-3.5 turbo.

This model represents a significant advancement in natural language processing (NLP) capabilities. It’s renowned for its enhanced performance compared to previous models, especially in tasks related to understanding and generating conversational content. The model’s official integration with Perplexity indicates a potential leap in chatbot capabilities, promising more accurate and contextually rich responses for a more engaging user experience.

To kick things off, our primary focus involves integrating this new model into our bot. The process is quite straightforward — we only need to modify a single line of the existing codebase.

llm = Perplexity(
api_key=os.getenv("PERPLEXITY_API_KEY"), model="mixtral-8x7b-instruct", temperature=0.5
)

Seems simple, right? This simplicity exemplifies the true strength of the LlamaIndex library.

Moving forward, our next step involves implementing code support to save our entities into MongoDb. Now, before delving into the implementation details, let’s first define what constitutes our entities. To gain clarity, let’s refer to our straightforward entity relationship image.

Entities, in this context, refer to the core components or objects within our system that we aim to store in a MongoDb database.

entity model

Within our system, three primary entities come into focus:

  1. Users: This entity is optional for applications that already possess user support. Its purpose primarily involves preserving chat history for individual users. Users can either be registered users or guests. Guests refer to random users not registered on our platform.
  2. Chats: Considered the foundational entity, this component enables the association of a user or a guest with a specific chat session.
  3. Chat Messages: At the core of our application, this entity holds information about user feedback, including positive or negative sentiments, alongside responses from our assistant. This data proves invaluable for analyzing the quality of our bot’s responses and storing user feedback.

Our initial focus revolves around implementing chat support, initiating with the creation of an endpoint that allows guest users to commence a chat session. To achieve this, our next step involves creating a new endpoint within the ‘apis/chats’ file.

@app.route("/chats/guest", methods=["GET"])
@limiter.limit("5 per minute")
def get_guest_chat():
chat = create_chat(ObjectId(), "guest")

return jsonify({'_id': str(chat['_id'])}), 200

This method facilitates the creation of a chat session and returns the corresponding ID to the frontend. To accomplish this, our next step involves incorporating a method within our ‘mongodb.index’ file. It’s important to note that all subsequent methods will be created within this file.

def create_chat(userId, role):
db.chats.insert_one({
"userId": ObjectId(userId),
"role": role,
"createdAt": datetime.datetime.now()
})

return db.chats.find_one({"userId": ObjectId(userId)})

When creating the chat entity for guests, we store the userId (a randomly generated guest ID), the role (always designated as ‘guest’), and the creation date.

Now, for applications that already support user registration, we aim to replicate this process. However, before proceeding with the endpoint creation, an additional step is required: implementing JWT authentication. This step ensures the validation of existing users. To accomplish this, we will develop a middleware responsible for authenticating a JWT and retrieving the complete user information from the database.

JSON Web Token (JWT) authentication is a widely used method for securing web applications by validating and ensuring the integrity of transmitted information between parties. In this scenario, implementing JWT authentication involves creating middleware that intercepts and verifies the authenticity of JWTs sent with requests. Upon successful validation, this middleware retrieves user information from the database, enabling seamless identification and authentication of registered users within the application.

from functools import wraps
from flask import current_app
from flask import request

import jwt

from mongodb.index import getUserById


def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
return validate_user_token(f, False, *args, **kwargs)
except Exception as e:
print(e)
return {
"message": "Something went wrong",
"data": None,
"error": str(e)
}, 500

return decorated


def user_token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
return validate_user_token(f, True, *args, **kwargs)
except Exception as e:
print(e)
return {
"message": "Something went wrong",
"data": None,
"error": str(e)
}, 500

return decorated


def admin_token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
return validate_admin_token(f, False, *args, **kwargs)
except Exception as e:
print(e)
return {
"message": "Something went wrong",
"data": None,
"error": str(e)
}, 500

return decorated


def validate_admin_token(f, return_user, *args, **kwargs):
token = None

if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[1]

if not token:
return {
"message": "Authentication Token is missing!",
"data": None,
"error": "Unauthorized"
}, 401

data = jwt.decode(
token, current_app.config["SECRET_KEY"], algorithms=["HS256"])

current_user = getUserById(data["id"])

if current_user is None:
return {
"message": "Invalid Authentication token!",
"data": None,
"error": "Unauthorized"
}, 401

if not current_user["isEnabled"]:
return {
"message": "Invalid Authentication user!",
"data": None,
"error": "Forbidden"
}, 403

if current_user["role"] != "admin":
return {
"message": "Invalid Authentication user!",
"data": None,
"error": "Forbidden"
}, 403

if return_user:
return f(current_user, *args, **kwargs)

return f(*args, **kwargs)


def validate_user_token(f, return_user, *args, **kwargs):
token = None

if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[1]

if not token:
return {
"message": "Authentication Token is missing!",
"data": None,
"error": "Unauthorized"
}, 401

data = jwt.decode(
token, current_app.config["SECRET_KEY"], algorithms=["HS256"])

current_user = getUserById(data["id"])

if current_user is None:
return {
"message": "Invalid Authentication token!",
"data": None,
"error": "Unauthorized"
}, 401

if not current_user["isEnabled"]:
return {
"message": "Invalid Authentication user!",
"data": None,
"error": "Forbidden"
}, 403

if return_user:
return f(current_user, *args, **kwargs)

return f(*args, **kwargs)

This code includes two middleware functions designed to validate user and admin tokens. Additionally, here’s an excerpt of the getUserById method.

def getUserById(id):
"""
Get an existing user by id
"""

return db['users'].find_one({'_id': ObjectId(id)})

now as before we start with the endpoint for creating a chat for a logged user:

@app.route("/chats/me", methods=["GET"])
@limiter.limit("5 per minute")
@user_token_required
def get_my_chat(current_user):
chat = get_user_chat(current_user["_id"])

if chat is None:
chat = create_chat(current_user["_id"], current_user["role"])

return jsonify(parse_mongo_item_to_json(chat)), 200

is really similar as the previous one ecxept for the get_user_chat that try to understand if a user has already a chat or need to create a new one.

def get_user_chat(userId):
return db.chats.find_one({"userId": ObjectId(userId)})

as always the db code for the get_user_chat method.

we create also and endpoint for get the user chat previous messages:

@app.route("/chats/me/history", methods=["GET"])
@limiter.limit("15 per minute")
@user_token_required
def get_my_chat_history(current_user):
chat = get_user_chat(current_user["_id"])
chat_history = retrieve_user_chat_history(str(chat['_id']))

return jsonify(chat_history), 200

this the db code:

def retrieve_user_chat_history(chatId):
"""
Retrieve chat history from MongoDB
"""
messages = []

cursor = db.chatmessages.find(
{"chatId": ObjectId(chatId)}).sort('createdAt', 1)

for item in cursor:
messages.append(parse_mongo_item_to_json(item))

return messages

After reviewing multiple code snippets, let’s step back for a global overview. We’ve successfully created all the necessary endpoints, enabling both guest and registered users to initiate a chat. Amidst these implementations, a particular piece of code remains unexplored — the parse_mongo_item_to_json method. This method plays a crucial role in converting a MongoDB item into JSON format, eliminating specific MongoDB identifiers and formatting dates accordingly. Here's a glimpse of the code responsible for this transformation.

def parse_mongo_item_to_json(item):
return json.loads(json.dumps(item, cls=MongoJSONEncoder))

this is a parser that use a Class to parse and item, the parse class is the follow:

from datetime import datetime, date

from bson import ObjectId
from json import JSONEncoder


class MongoJSONEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, (datetime, date)):
return o.isoformat(timespec='milliseconds')
if isinstance(o, ObjectId):
return str(o)
else:
return super().default(o)

Seems simple, right? This class is fundamental; it’s responsible for parsing IDs, formatting dates, or leaving the value unchanged.

Now, let’s dive into the focal point of this article — the ChatEngine logic. In our previous article, we exclusively defined the query engine without addressing anything related to chat logic. Let’s start with an overview of the various types of chats that LlamaIndex offers and how each type operates:

  1. Agent Chats: Typically considered the best, available in two types: OpenAI or ReAct. This logic involves an agent that evaluates responses and attempts multiple regenerations if the initial response is unsatisfactory. While it delivers excellent responses, latency poses an issue, making it unsuitable for our use case.
  2. Condense Question Chats: This type rewrites user answers to elicit better responses from the query engine. Great for enhancing response quality, particularly regarding pricing information, but similar to the Agent Chats, it suffers from latency concerns.
  3. Context Mode: In this mode, we utilize the response from the query engine (vector DB) alongside the user’s answer. This results in one or multiple calls to the LLM model, fetching a response aligned with the stored vector data. Though not as precise as previous modes, it excels in low latency.
  4. Context + Condense: A blend of the two aforementioned methods.

This overview is simplified. If you need a detailed analysis, we’ve provided a link to our documentation: https://docs.llamaindex.ai/en/stable/module_guides/deploying/chat_engines/modules.html

BONUS: While not a technical recommendation, I discovered that using a non-English language (in my case, Italian) significantly increases latency in the first two methods. Consider this if you encounter issues with languages and latency.

For our business case, we’ve chosen the Context Mode and, as a bonus, plan to implement it using low-level APIs


@app.route("/chats/<chatId>/answer", methods=["GET"])
@limiter.limit("10 per minute")
def query_index(chatId):
answer = request.args.get('answer')

if answer is None:
return "No text found, please include a ?answer=example parameter in the URL", 400

history = retrieve_chat_history(chatId)

memory = ChatMemoryBuffer.from_defaults(
chat_history=history
)

chat_engine = ContextChatEngine.from_defaults(
retriever=query_engine,
memory=memory,
service_context=service_context,
system_prompt=(
"""\
You are a chatbot. You MUST NOT provide any information unless it is in the Context or previous messages. If the user ask something you don't know, say that you cannot answer. \
you MUST keep the answers short and simple. \
"""
),
verbose=True
)

response = chat_engine.stream_chat(message=answer)

return Response(send_and_save_response(response, chatId, answer), mimetype='application/json')


def send_and_save_response(response, chatId, query_text):
for token in response.response_gen:
yield token

user_message = insert_message_in_chat(chatId, query_text, 'user')
bot_response = insert_message_in_chat(chatId, str(response), 'assistant')

json_bosy = {
"user_message": str(user_message.inserted_id),
"bot_response": str(bot_response.inserted_id)
}

yield f" {json_bosy}"

This endpoint has evolved significantly since the last iteration, and it operates in several steps:

  1. Fetching Chat History: To provide the LLM model with context, we retrieve the chat history. Specifically, we gather the last four messages from the chat, a strategic choice aimed at optimizing latency.
  2. ChatEngine Initialization: Here, we instantiate the ChatEngine, incorporating the chat history as a memory object, along with the query engine and service context. Additionally, we provide a prompt to guide response generation. Experimentation with various prompts, especially for multilanguage support, is encouraged.
  3. Stream Chat Interaction: Using a stream_chat call, we initiate the generation of a response. This call enables chunked responses similar to the GPT website effect, where responses arrive asynchronously in segments.
  4. Chunked Response Handling: As each response chunk arrives, we forward it and continue until all chunks are received. Subsequently, we save the messages and send back the complete response to the frontend. It’s crucial to retain the response ID for feedback submission purposes.

the db methods for doing the previous steps:

def retrieve_chat_history(chatId):
"""
Retrieve chat history from MongoDB
"""

chat_history = get_chat_history(chatId)

return convertMongoChat(chat_history)

def insert_message_in_chat(chatId, message, role):

return db.chatmessages.insert_one({
"chatId": ObjectId(chatId),
"message": message,
"role": role,
"createdAt": datetime.datetime.now()
})

for the last step we create the endpoint for leave a feedback to a message (the bot responses):

@app.route("/chats/message/<messageId>/feedback", methods=["PUT"])
def set_message_feedback(messageId):

body = request.get_json()

if 'feedback' not in body:
return "Invalid body set feedback field with value good | bad", 400

feedback = body["feedback"]

if feedback is None:
return "Invalid feedback (use good or bad)", 400

if feedback not in ['good', 'bad']:
return "Invalid feedback (use good or bad)", 400

updated_message = update_message_feedback(messageId, feedback)

del updated_message["_id"]
del updated_message["chatId"]

return updated_message, 200

def update_message_feedback(messageId, feedback):
"""
Update a message feedback in MongoDB
"""

db.chatmessages.update_one({"_id": ObjectId(messageId)}, {
"$set": {"feedback": feedback}})

return db.chatmessages.find_one({"_id": ObjectId(messageId)})

We’ve made a straightforward update, modifying the ‘feedback’ database field to categorize responses as either ‘good’ or ‘bad,’ including necessary checks.

Now, it’s time to put all the code to the test:

  1. Creating a Guest Chat: We initiate a GET call on ‘/chats/guest’ to create a guest chat session and retrieve our unique ID.
  2. Answering a Query: Using the ‘/chats/658d930f4f69a7853e018fbd/answer?answer=how can i win in this game?’ endpoint, we interact with the chat engine and the new model. Impressively, the response is not only swift but also remarkably accurate! We encourage you to share your experience in the comments, detailing your specific performance analysis. Let’s analyze our results collectively.

BONUS: Re-captcha Authentication: As a crucial step before implementing middleware for user authentication, let’s address protection against bots. To prevent scenarios where automated scripts make excessive calls, potentially leading to increased costs, implementing a re-captcha authentication method becomes imperative. This security measure ensures that users or guests interacting with the system are legitimate. Implementation involves using a decorator within the endpoint, similar to the user token middleware.

from functools import wraps
from flask import request

import requests
import os


def validate_re_captcha_token(f):
@wraps(f)
def decorated(*args, **kwargs):
try:
return validate_captcha_token(f, *args, **kwargs)
except Exception as e:
print(e)
return {
"message": "Something went wrong",
"data": None,
"error": str(e)
}, 500

return decorated


def validate_captcha_token(f, *args, **kwargs):
token = None

if "Authorization" in request.headers:
token = request.headers["Authorization"].split(" ")[1]

if not token:
return {
"message": "Captcha Token is missing!",
"data": None,
"error": "Unauthorized"
}, 401

response = requests.post(
url="https://www.google.com/recaptcha/api/siteverify",
params={
'secret': os.getenv("RECAPTCHA_SECRET_KEY"),
'response': token
}
)

json_result = response.json()

if not json_result["success"]:
return {
"message": "Captcha Token is invalid!",
"data": None,
"error": "Unauthorized"
}, 401

return f(*args, **kwargs)

Give it a try on your own to unleash the full potential of your production bot!

In the upcoming episodes, we’ll delve into:

  1. Dynamic Source Addition: Exploring how to seamlessly add sources into the index and vector DB to ensure our RAG (Retrieval-Augmented Generation) model consistently utilizes updated data.
  2. Frontend Development: Creating the frontend interface for our bot to enhance user interaction and experience.
  3. Backend Deployment via Heroku: Preparing our backend for production traffic by deploying it on Heroku.

If you’ve enjoyed the article and this series, we encourage you to engage further. Comment on the topics you’re eager to explore in-depth, such as a comprehensive exploration of chat engines, performance optimization, or any other areas you wish to delve deeper into. Show your support by giving a clap and sharing the article on your preferred social platforms!

As always, the complete code covered in this article is available in our open repository here: GitHub Repository Link. Feel free to contribute by making pull requests.”

This roadmap sets the stage for a diverse range of topics, from fine-tuning the RAG model to deploying the system for real-world traffic, offering an engaging journey for enthusiasts and learners alike.

--

--