# Step 1: Import required modules import streamlit as st from PyPDF2 import PdfReader import docx2txt import json import pandas as pd from langchain.text_splitter import RecursiveCharacterTextSplitter import os from langchain.vectorstores import FAISS from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate import whisper import requests from dotenv import load_dotenv # Load the Groq API key from the environment variable api_key = os.getenv("GROQ_API_KEY") if not api_key: raise ValueError("No API key found. Please set the GROQ_API_KEY environment variable.") # Step 4: Function to read files and extract text def extract_text(file): text = "" try: if file.name.endswith(".pdf"): pdf_reader = PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() elif file.name.endswith(".docx"): text = docx2txt.process(file) elif file.name.endswith(".txt"): text = file.read().decode("utf-8") # Assuming UTF-8 by default elif file.name.endswith(".csv"): df = pd.read_csv(file, encoding='utf-8') # Assuming UTF-8 by default text = df.to_string() elif file.name.endswith(".xlsx"): df = pd.read_excel(file) text = df.to_string() elif file.name.endswith(".json"): data = json.load(file) text = json.dumps(data, indent=4) except UnicodeDecodeError: # Handle the error by trying a different encoding file.seek(0) # Reset the file pointer if file.name.endswith(".txt"): text = file.read().decode("ISO-8859-1") # Try Latin-1 encoding elif file.name.endswith(".csv"): df = pd.read_csv(file, encoding='ISO-8859-1') # Try Latin-1 encoding text = df.to_string() return text # Step 5: Function to convert text into chunks def get_text_chunks(text): text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000) chunks = text_splitter.split_text(text) return chunks # Step 6: Function for converting chunks into embeddings and saving the FAISS index def get_vector_store(text_chunks): embeddings = get_groq_embeddings(text_chunks) if embeddings: vector_store = FAISS.from_texts(text_chunks, embedding=embeddings) # Ensure the directory exists if not os.path.exists("faiss_index"): os.makedirs("faiss_index") vector_store.save_local("faiss_index") print("FAISS index saved successfully.") else: st.error("Failed to retrieve embeddings from Groq API.") # Step 7: Function to implement the Groq Model def get_conversational_chain(): prompt_template = """ Answer the question as detailed as possible from the provided context. If the answer is not in the provided context, just say, "The answer is not available in the context." Do not provide a wrong answer.\n\n Context:\n {context}\n Question: \n{question}\n Answer: """ # Assuming we use the Groq API for the model as well # Replace with your Groq model call or other LLM API model = 'llama3-8b-8192' # Placeholder for the actual model call prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) return chain # Step 8: Function to take inputs from user and generate response def user_input(user_question): embeddings = get_groq_embeddings([user_question]) if embeddings: new_db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True) docs = new_db.similarity_search(user_question) chain = get_conversational_chain() response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True) return response["output_text"] else: return "Failed to retrieve response from Groq API." # Step 9: Streamlit App def main(): st.set_page_config(page_title="RAG Chatbot") st.header("Chat with Multiple Files using RAG and Groq 💁") user_question = st.text_input("Ask a Question") if user_question: with st.spinner("Processing your question..."): response = user_input(user_question) st.write("Reply: ", response) with st.sidebar: st.title("Upload Files:") uploaded_files = st.file_uploader("Upload your files", accept_multiple_files=True, type=["pdf", "docx", "txt", "csv", "xlsx", "json"]) if st.button("Submit & Process"): if uploaded_files: with st.spinner("Processing files..."): combined_text = "" for file in uploaded_files: combined_text += extract_text(file) + "\n" text_chunks = get_text_chunks(combined_text) get_vector_store(text_chunks) st.success("Files processed and indexed successfully!") else: st.error("Please upload at least one file.") if __name__ == "__main__": main()