RAG_System / app.py
Engr-Saeed's picture
Update app.py
dc28374 verified
raw
history blame
No virus
4.86 kB
# Step 1: Import required modules
import streamlit as st
from PyPDF2 import PdfReader
import docx2txt
import json
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter
import os
from langchain.vectorstores import FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
import whisper
from groq import GroqAPI
from dotenv import load_dotenv
# Step 2: Load environment variable
load_dotenv()
groq_api_key = os.getenv("GROQ_API_KEY")
# Step 3: Initialize Groq API
groq = GroqAPI(api_key=groq_api_key)
# Step 4: Function to read files and extract text
def extract_text(file):
text = ""
try:
if file.name.endswith(".pdf"):
pdf_reader = PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text()
elif file.name.endswith(".docx"):
text = docx2txt.process(file)
elif file.name.endswith(".txt"):
text = file.read().decode("utf-8") # Assuming UTF-8 by default
elif file.name.endswith(".csv"):
df = pd.read_csv(file, encoding='utf-8') # Assuming UTF-8 by default
text = df.to_string()
elif file.name.endswith(".xlsx"):
df = pd.read_excel(file)
text = df.to_string()
elif file.name.endswith(".json"):
data = json.load(file)
text = json.dumps(data, indent=4)
except UnicodeDecodeError:
# Handle the error by trying a different encoding
file.seek(0) # Reset the file pointer
if file.name.endswith(".txt"):
text = file.read().decode("ISO-8859-1") # Try Latin-1 encoding
elif file.name.endswith(".csv"):
df = pd.read_csv(file, encoding='ISO-8859-1') # Try Latin-1 encoding
text = df.to_string()
return text
# Step 5: Function to convert text into chunks
def get_text_chunks(text):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
chunks = text_splitter.split_text(text)
return chunks
# Step 6: Function for converting chunks into embeddings and saving the FAISS index
def get_vector_store(text_chunks):
embeddings = groq.get_embeddings(text_chunks)
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings)
# Ensure the directory exists
if not os.path.exists("faiss_index"):
os.makedirs("faiss_index")
vector_store.save_local("faiss_index")
print("FAISS index saved successfully.")
# Step 7: Function to implement the Groq Model
def get_conversational_chain():
prompt_template = """
Answer the question as detailed as possible from the provided context. If the answer is not in
the provided context, just say, "The answer is not available in the context." Do not provide a wrong answer.\n\n
Context:\n {context}\n
Question: \n{question}\n
Answer:
"""
model = groq.get_chat_model("llama3-8b-8192") # Replace with your Groq model ID
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
# Step 8: Function to take inputs from user and generate response
def user_input(user_question):
embeddings = groq.get_embeddings([user_question])
new_db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
docs = new_db.similarity_search(user_question)
chain = get_conversational_chain()
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True)
return response["output_text"]
# Step 9: Streamlit App
def main():
st.set_page_config(page_title="RAG Chatbot")
st.header("Chat with Multiple Files using RAG and Groq πŸ’")
user_question = st.text_input("Ask a Question")
if user_question:
with st.spinner("Processing your question..."):
response = user_input(user_question)
st.write("Reply: ", response)
with st.sidebar:
st.title("Upload Files:")
uploaded_files = st.file_uploader("Upload your files", accept_multiple_files=True, type=["pdf", "docx", "txt", "csv", "xlsx", "json"])
if st.button("Submit & Process"):
if uploaded_files:
with st.spinner("Processing files..."):
combined_text = ""
for file in uploaded_files:
combined_text += extract_text(file) + "\n"
text_chunks = get_text_chunks(combined_text)
get_vector_store(text_chunks)
st.success("Files processed and indexed successfully!")
else:
st.error("Please upload at least one file.")
if __name__ == "__main__":
main()