import io import tempfile from ipaddress import ip_address from typing import Optional import nltk import jwt import base64 import json from click import option from jwt import ExpiredSignatureError, InvalidTokenError from starlette import status from functions import * import pandas as pd from fastapi import FastAPI, File, UploadFile, HTTPException, Request, Query from pydantic import BaseModel from fastapi.middleware.cors import CORSMiddleware from src.api.speech_api import speech_translator_router from functions import client as supabase from urllib.parse import urlparse from collections import Counter, defaultdict from datetime import datetime, timedelta from dateutil.parser import isoparse nltk.download("punkt_tab") app = FastAPI(title="ConversAI", root_path="/api/v1") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(speech_translator_router, prefix="/speech") @app.post("/signup") async def sign_up(email, username, password): res, _ = supabase.auth.sign_up( {"email": email, "password": password, "role": "user"} ) user_id = res[1].id r_ = createUser(user_id=user_id, username=username, email=email) if r_.get('code') == 409: return r_ elif r_.get('code') == 200: response = { "status": "success", "code": 200, "message": "Please check you email address for email verification", } else: response = { "status": "failed", "code": 400, "message": "Failed to sign up please try again later", } return response @app.post("/session-check") async def check_session(user_id: str): res = supabase.auth.get_session() if res == None: try: supabase.table("Stores").delete().eq( "StoreID", user_id ).execute() resp = supabase.auth.sign_out() response = {"message": "success", "code": 200, "Session": res} return response except Exception as e: raise HTTPException(status_code=400, detail=str(e)) return res @app.post("/get-user") async def get_user(access_token): res = supabase.auth.get_user(jwt=access_token) return res @app.post("/referesh-token") async def refresh_token(refresh_token): res = supabase.auth.refresh_token(refresh_token) return res @app.post("/login") async def sign_in(email, password): try: res = supabase.auth.sign_in_with_password( {"email": email, "password": password} ) user_id = res.user.id access_token = res.session.access_token refresh_token = res.session.refresh_token store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute() store_id = None if store_session_check and store_session_check.data: store_id = store_session_check.data[0].get("StoreID") userData = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id).execute().data username = userData[0]["username"] if not store_id: response = ( supabase.table("Stores").insert( { "AccessToken": access_token, "StoreID": user_id, "RefreshToken": refresh_token, "email": email } ).execute() ) message = { "message": "Success", "code": status.HTTP_200_OK, "username": username, "user_id": user_id, "access_token": access_token, "refresh_token": refresh_token, } return message elif store_id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="You are already signed in. Please sign out first to sign in again." ) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to sign in. Please check your credentials." ) except HTTPException as http_exc: raise http_exc except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"An unexpected error occurred during sign-in: {str(e)}" ) @app.post("/login_with_token") async def login_with_token(access_token: str, refresh_token: str): try: decoded_token = jwt.decode(access_token, options={"verify_signature": False}) user_id_oauth = decoded_token.get("sub") try: user_id = supabase.table("ConversAI_UserInfo").select("*").filter("user_id", "eq", user_id_oauth).execute() user_id = supabase.table("ConversAI_UserInfo").select("*").filter("email", "eq", user_id_oauth).execute() user_name = user_id.data[0]["username"] except: user_name = '' json = { "code": status.HTTP_200_OK, "user_id": decoded_token.get("sub"), "email": decoded_token.get("email"), "access_token": access_token, "refresh_token": refresh_token, "issued_at": decoded_token.get("iat"), "expires_at": decoded_token.get("exp"), "username": user_name } return json except (ExpiredSignatureError, InvalidTokenError) as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e)) @app.post("/user_name") async def user_name_(username: str, user_id: str, email: str): r_ = createUser(user_id=user_id, username=username, email=email) return r_ @app.post("/set-session-data") async def set_session_data(access_token, refresh_token, user_id): res = supabase.auth.set_session(access_token, refresh_token) store_session_check = supabase.table("Stores").select("*").filter("StoreID", "eq", user_id).execute() store_id = None if store_session_check and store_session_check.data: store_id = store_session_check.data[0].get("StoreID") if not store_id: response = ( supabase.table("Stores").insert( { "AccessToken": access_token, "StoreID": user_id, "RefreshToken": refresh_token, } ).execute() ) res = { "message": "success", "code": 200, "session_data": res, } return res @app.post("/logout") async def sign_out(user_id): try: supabase.table("Stores").delete().eq( "StoreID", user_id ).execute() res = supabase.auth.sign_out() response = {"message": "success"} return response except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.post("/oauth") async def oauth(): res = supabase.auth.sign_in_with_oauth( {"provider": "google", "options": {"redirect_to": "https://convers-ai-test.vercel.app/home"}}) return res @app.post("/newChatbot") async def newChatbot(chatbotName: str, username: str): currentBotCount = len(listTables(username=username)["output"]) limit = supabase.table("ConversAI_UserConfig").select("chatbotLimit").eq("user_id", username).execute().data[0][ "chatbotLimit"] if currentBotCount >= int(limit): return { "output": "CHATBOT LIMIT EXCEEDED" } supabase.table("ConversAI_ChatbotInfo").insert({"user_id": username, "chatbotname": chatbotName}).execute() chatbotName = f"convai${username}${chatbotName}" return createTable(tablename=chatbotName) @app.post("/loadPDF") async def loadPDF(vectorstore: str, pdf: UploadFile = File(...)): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] source = pdf.filename pdf = await pdf.read() with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: temp_file.write(pdf) temp_file_path = temp_file.name text = extractTextFromPdf(temp_file_path) os.remove(temp_file_path) dct = { "output": text, "source": source } numTokens = len(" ".join([text[x] for x in text]).translate(str.maketrans('', '', string.punctuation)).split(" ")) dct = json.dumps(dct, indent=1).encode("utf-8") fileName = createDataSourceName(sourceName=source) response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "numTokens": numTokens, "sourceEndpoint": "/loadPDF", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } @app.post("/loadImagePDF") async def loadImagePDF(vectorstore: str, pdf: UploadFile = File(...)): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] source = pdf.filename pdf = await pdf.read() text = getTextFromImagePDF(pdfBytes=pdf) dct = { "output": text, "source": source } dct = json.dumps(dct, indent=1).encode("utf-8") fileName = createDataSourceName(sourceName=source) response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "sourceEndpoint": "/loadImagePDF", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } class AddText(BaseModel): vectorstore: str text: str @app.post("/loadText") async def loadText(addTextConfig: AddText): vectorstore, text = addTextConfig.vectorstore, addTextConfig.text username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] dct = { "output": cleanText(text = text), "source": "Text" } dct = json.dumps(dct, indent=1).encode("utf-8") fileName = createDataSourceName(sourceName="Text") response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "sourceEndpoint": "/loadText", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } class AddQAPair(BaseModel): vectorstore: str question: str answer: str @app.post("/addQAPair") async def addQAPairData(addQaPair: AddQAPair): username, chatbotname = addQaPair.vectorstore.split("$")[1], addQaPair.vectorstore.split("$")[2] df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data) currentCount = df[(df["user_id"] == username) & (df["chatbotname"] == chatbotname)]["charactercount"].iloc[0] qa = f"QUESTION: {addQaPair.question}\tANSWER: {addQaPair.answer}" newCount = currentCount + len(qa) limit = supabase.table("ConversAI_UserConfig").select("tokenLimit").eq("user_id", username).execute().data[0][ "tokenLimit"] if newCount < int(limit): supabase.table("ConversAI_ChatbotInfo").update({"charactercount": str(newCount)}).eq("user_id", username).eq( "chatbotname", chatbotname).execute() return addDocuments(text=qa, source="Q&A Pairs", vectorstore=addQaPair.vectorstore) else: return { "output": "WEBSITE EXCEEDING LIMITS, PLEASE TRY WITH A SMALLER DOCUMENT." } class LoadWebsite(BaseModel): vectorstore: str urls: list[str] source: str @app.post("/loadWebURLs") async def loadWebURLs(loadWebsite: LoadWebsite): vectorstore, urls, source = loadWebsite.vectorstore, loadWebsite.urls, loadWebsite.source username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] text = extractTextFromUrlList(urls=urls) dct = { "output": text, "source": source } dct = json.dumps(dct, indent=1).encode("utf-8") fileName = createDataSourceName(sourceName=source) response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "sourceEndpoint": "/loadWebURLs", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } @app.post("/answerQuery") async def answerQuestion(request: Request, query: str, vectorstore: str, llmModel: str = "llama3-70b-8192"): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] output = answerQuery(query=query, vectorstore=vectorstore, llmModel=llmModel) ip_address = request.client.host response_token_count = len(output["output"]) city = get_ip_info(ip_address) response = ( supabase.table("ConversAI_ChatHistory") .insert({"username": username, "chatbotName": chatbotName, "llmModel": llmModel, "question": query, "response": output["output"], "IpAddress": ip_address, "ResponseTokenCount": response_token_count, "vectorstore": vectorstore, "City": city}) .execute() ) return output @app.post("/deleteChatbot") async def deleteChatbot(vectorstore: str): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] supabase.table('ConversAI_ChatbotInfo').delete().eq('user_id', username).eq('chatbotname', chatbotName).execute() return deleteTable(tableName=vectorstore) @app.post("/listChatbots") async def listChatbots(username: str): return listTables(username=username) @app.post("/getLinks") async def crawlUrl(baseUrl: str): return { "urls": getLinks(url=baseUrl, timeout=30), "source": urlparse(baseUrl).netloc } @app.post("/getCurrentCount") async def getCount(vectorstore: str): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] df = pd.DataFrame(supabase.table("ConversAI_ChatbotInfo").select("*").execute().data) return { "currentCount": df[(df['user_id'] == username) & (df['chatbotname'] == chatbotName)]['charactercount'].iloc[0] } class YtTranscript(BaseModel): vectorstore: str urls: list[str] @app.post("/loadYoutubeTranscript") async def loadYoutubeTranscript(ytTranscript: YtTranscript): vectorstore, urls = ytTranscript.vectorstore, ytTranscript.urls username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] text = getTranscript(urls=urls) dct = { "output": text, "source": "www.youtube.com" } dct = json.dumps(dct, indent=1).encode("utf-8") fileName = createDataSourceName(sourceName="youtube") response = supabase.storage.from_("ConversAI").upload(file=dct, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "sourceEndpoint": "/getYoutubeTranscript", "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } @app.post("/analyzeData") async def analyzeAndAnswer(query: str, file: UploadFile = File(...)): extension = file.filename.split(".")[-1] try: if extension in ["xls", "xlsx", "xlsm", "xlsb"]: df = pd.read_excel(io.BytesIO(await file.read())) response = analyzeData(query=query, dataframe=df) elif extension == "csv": df = pd.read_csv(io.BytesIO(await file.read())) response = analyzeData(query=query, dataframe=df) else: response = "INVALID FILE TYPE" return { "output": response } except: return { "output": "UNABLE TO ANSWER QUERY" } @app.post("/getChatHistory") async def chatHistory(vectorstore: str): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] response = supabase.table("ConversAI_ChatHistory").select("timestamp", "question", "response").eq("username", username).eq( "chatbotName", chatbotName).execute().data return response @app.post("/listChatbotSources") async def listChatbotSources(vectorstore: str): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] result = supabase.table("ConversAI_ChatbotDataSources").select("*").eq("username", username).eq("chatbotName", chatbotName).execute().data return result @app.post("/deleteChatbotSource") async def deleteChatbotSource(dataSourceName: str): response = supabase.table("ConversAI_ChatbotDataSources").delete().eq("dataSourceName", dataSourceName).execute() response = supabase.storage.from_('ConversAI_ChatbotDataSources').remove(f"{dataSourceName}_data.json") return { "output": "SUCCESS" } class LoadEditedJson(BaseModel): vectorstore: str dataSourceName: str sourceEndpoint: str jsonData: dict[str, str] @app.post("/loadEditedJson") async def loadEditedJson(loadEditedJsonConfig: LoadEditedJson): username, chatbotName = loadEditedJsonConfig.vectorstore.split("$")[1], loadEditedJsonConfig.vectorstore.split("$")[2] jsonData = json.dumps(loadEditedJsonConfig.jsonData, indent = 1).encode("utf-8") fileName = createDataSourceName(loadEditedJsonConfig.dataSourceName) response = supabase.storage.from_("ConversAI").upload(file=jsonData, path=f"{fileName}_data.json") response = ( supabase.table("ConversAI_ChatbotDataSources") .insert({"username": username, "chatbotName": chatbotName, "dataSourceName": fileName, "sourceEndpoint": loadEditedJsonConfig.sourceEndpoint, "sourceContentURL": os.path.join(os.environ["SUPABASE_PUBLIC_BASE_URL"], f"{fileName}_data.json")}) .execute() ) return { "output": "SUCCESS" } @app.post("/publicOrPrivate") async def publicOrPrivate(vectorstore: str, mode: str = "public"): username, chatbotName = vectorstore.split("$")[1], vectorstore.split("$")[2] response = ( supabase.table("ConversAI_ChatbotInfo") .update({"public/private": mode}) .eq("user_id", username) .eq("chatbotname", chatbotName) .execute() ) return { "output": "SUCCESS" } class TrainChatbot(BaseModel): vectorstore: str urls: list[str] @app.post("/trainChatbot") async def trainChatbot(trainChatbotConfig: TrainChatbot): vectorstore, UrlSources = trainChatbotConfig.vectorstore, trainChatbotConfig.urls texts = [] sources = [] fileTypes = [supabase.table("ConversAI_ChatbotDataSources").select("sourceEndpoint").eq("sourceContentURL", x).execute().data[0][ "sourceEndpoint"] for x in UrlSources] for source, fileType in zip(UrlSources, fileTypes): if ((fileType == "/loadPDF") | (fileType == "/loadImagePDF")): r = requests.get(source) file = eval(r.content.decode("utf-8")) content = file["output"] fileSource = file["source"] texts.append(".".join( [base64.b64decode(content[key].encode("utf-8")).decode("utf-8") for key in content.keys()]).replace( "\n", " ")) sources.append(fileSource) elif fileType == "/loadText": r = requests.get(source) file = eval(r.content.decode("utf-8")) content = file["output"] fileSource = file["source"] texts.append(content.replace("\n", " ")) sources.append(fileSource) elif ((fileType == "/loadWebURLs") | (fileType == "/loadYoutubeTranscript")): r = requests.get(source) file = eval(r.content.decode("utf-8")) content = file["output"] fileSource = file["source"] texts.append(".".join( [base64.b64decode(content[key].encode("utf-8")).decode("utf-8") for key in content.keys()]).replace( "\n", " ")) sources.append(fileSource) else: pass texts = [(text, source) for text, source in zip(texts, sources)] return addDocuments(texts=texts, vectorstore=vectorstore) def get_ip_info(ip: str): try: response = requests.get(f"https://ipinfo.io/{ip}/json") data = response.json() return data.get("city", "Unknown") except Exception as e: return "Unknown" @app.post("/daily_chat_count") async def daily_chat_count( start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") ): if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = supabase.table("ConversAI_ChatHistory").select("*").execute().data dates = [ isoparse(i["timestamp"]).date() for i in response if start_date <= isoparse(i["timestamp"]).date() <= end_date ] date_count = Counter(dates) data = [{"date": date.isoformat(), "count": count} for date, count in date_count.items()] return {"data": data} @app.post("/daily_active_end_user") async def daily_active_end_user( start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") ): if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = supabase.table("ConversAI_ChatHistory").select("*").execute().data ip_by_date = defaultdict(set) for i in response: timestamp = isoparse(i["timestamp"]) ip_address = i["IpAddress"] if start_date <= timestamp.date() <= end_date: date = timestamp.date() ip_by_date[date].add(ip_address) data = [{"date": date.isoformat(), "terminal": len(ips)} for date, ips in ip_by_date.items() if len(ips) > 1] return {"data": data} @app.post("/average_session_interaction") async def average_session_interaction( start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") ): if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = supabase.table("ConversAI_ChatHistory").select("*").execute().data total_messages_by_date = defaultdict(int) unique_ips_by_date = defaultdict(set) for i in response: timestamp = isoparse(i["timestamp"]) ip_address = i["IpAddress"] if start_date <= timestamp.date() <= end_date: date = timestamp.date() total_messages_by_date[date] += 1 unique_ips_by_date[date].add(ip_address) data = [] for date in sorted(total_messages_by_date.keys()): total_messages = total_messages_by_date[date] unique_ips = len(unique_ips_by_date[date]) average_interactions = total_messages / unique_ips if unique_ips > 0 else 0 data.append({"date": date.isoformat(), "interactions": average_interactions}) return {"data": data} @app.post("/token_usages") async def token_usages( start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") ): if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = supabase.table("ConversAI_ChatHistory").select("*").execute().data token_usage_by_date = defaultdict(int) for i in response: timestamp = isoparse(i["timestamp"]) if start_date <= timestamp.date() <= end_date: date = timestamp.date() response_token_count = i.get("ResponseTokenCount") if response_token_count is not None: token_usage_by_date[date] += response_token_count data = [{"date": date.isoformat(), "total_tokens": total_tokens} for date, total_tokens in token_usage_by_date.items()] return {"data": data} @app.post("/add_feedback") async def add_feedback(request: Request, feedback: str, user_id: str): client_ip = request.client.host city = get_ip_info(client_ip) response = supabase.table("ConversAI_Feedback").insert( {"feedback": feedback, "user_id": user_id, "city": city, "ip": client_ip}).execute() return {"message": "success"} @app.post("/user_satisfaction_rate") async def user_satisfaction_rate( start_date: Optional[str] = Query(None, description="Start date in ISO format (YYYY-MM-DD)"), end_date: Optional[str] = Query(None, description="End date in ISO format (YYYY-MM-DD)") ): if not start_date or not end_date: end_date = datetime.now().astimezone().date() start_date = end_date - timedelta(days=7) else: start_date = isoparse(start_date).date() end_date = isoparse(end_date).date() response = supabase.table("ConversAI_Feedback").select("*").execute().data feedback_counts = defaultdict(lambda: {"like": 0, "dislike": 0}) for i in response: timestamp = isoparse(i["timestamp"]) if start_date <= timestamp.date() <= end_date: date = timestamp.date() feedback = i.get("feedback") if feedback == "like": feedback_counts[date]["like"] += 1 elif feedback == "dislike": feedback_counts[date]["dislike"] += 1 data = [] for date in sorted(feedback_counts.keys()): like_count = feedback_counts[date]["like"] dislike_count = feedback_counts[date]["dislike"] total_feedback = like_count + dislike_count satisfaction_rate = (like_count / total_feedback * 100) if total_feedback > 0 else 0 data.append({"date": date.isoformat(), "rate": satisfaction_rate}) return {"data": data}