import streamlit as st from youtube_transcript_api import YouTubeTranscriptApi from transformers import AutoTokenizer, AutoModelWithLMHead import torch import nltk nltk.download('wordnet') nltk.download('punkt') nltk.download('brown') nltk.download('stopwords') from nltk.tokenize import sent_tokenize from flashtext import KeywordProcessor from nltk.corpus import stopwords from urllib import response import requests import string import traceback import pke link = "http://127.0.0.1:8000/question" summary_tokenizer = AutoTokenizer.from_pretrained("t5-base") summary_model = AutoModelWithLMHead.from_pretrained("t5-base") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") summary_model = summary_model.to(device) question_model = AutoModelWithLMHead.from_pretrained('ramsrigouthamg/t5_squad_v1') question_tokenizer = AutoTokenizer.from_pretrained('ramsrigouthamg/t5_squad_v1') question_model = question_model.to(device) def query(url, payload): return requests.post(url, json=payload) def fetch_transcript(url): vid = url.split("=")[1] transcript = YouTubeTranscriptApi.get_transcript(vid) result = "" for i in transcript: result += ' ' + i['text'] return result def postprocesstext (content): final="" for sent in sent_tokenize(content): sent = sent.capitalize() final = final +" "+sent return final def summarizer(text,model,tokenizer): text = text.strip().replace("\n"," ") text = "summarize: "+text # print (text) max_len = 512 encoding = tokenizer.encode_plus(text,max_length=max_len, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] outs = model.generate(input_ids=input_ids, attention_mask=attention_mask, early_stopping=True, num_beams=3, num_return_sequences=1, no_repeat_ngram_size=2, min_length = 75, max_length=300) dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] summary = dec[0] summary = postprocesstext(summary) summary= summary.strip() return summary def get_nouns_multipartite(content): out=[] try: extractor = pke.unsupervised.MultipartiteRank() stoplist = list(string.punctuation) stoplist += ['-lrb-', '-rrb-', '-lcb-', '-rcb-', '-lsb-', '-rsb-'] stoplist += stopwords.words('english') extractor.load_document(input=content, stoplist=stoplist) # not contain punctuation marks or stopwords as candidates. pos = {'PROPN','NOUN'} extractor.candidate_selection(pos=pos) extractor.candidate_weighting(alpha=1.1, threshold=0.75, method='average') keyphrases = extractor.get_n_best(n=15) for val in keyphrases: out.append(val[0]) except: out = [] traceback.print_exc() return out def get_keywords(originaltext,summarytext,count): keywords = get_nouns_multipartite(originaltext) print ("keywords unsummarized: ",keywords) keyword_processor = KeywordProcessor() for keyword in keywords: keyword_processor.add_keyword(keyword) keywords_found = keyword_processor.extract_keywords(summarytext) keywords_found = list(set(keywords_found)) print ("keywords_found in summarized: ",keywords_found) important_keywords =[] for keyword in keywords: if keyword in keywords_found: important_keywords.append(keyword) return important_keywords[:int(count)] def get_question(context,answer,model,tokenizer): text = "context: {} answer: {}".format(context,answer) encoding = tokenizer.encode_plus(text,max_length=384, pad_to_max_length=False,truncation=True, return_tensors="pt").to(device) input_ids, attention_mask = encoding["input_ids"], encoding["attention_mask"] outs = model.generate(input_ids=input_ids, attention_mask=attention_mask, early_stopping=True, num_beams=5, num_return_sequences=1, no_repeat_ngram_size=2, max_length=72) dec = [tokenizer.decode(ids,skip_special_tokens=True) for ids in outs] Question = dec[0].replace("question:","") Question= Question.strip() return Question def all(url,count): transcript = fetch_transcript(url) summarized_text = summarizer(transcript, summary_model, summary_tokenizer) keywords = get_keywords(transcript,summarized_text,count) qna = [] for answer in keywords: qna.append(get_question(summarized_text,answer,question_model,question_tokenizer)+' : '+answer) return qna def main(): if 'submitted' not in st.session_state: st.session_state.submitted = False if 'opt' not in st.session_state: st.session_state.opt = [] def callback(): st.session_state.submitted = True st.title('QnA pair Generator') url = st.text_input('Enter the Video Link') count = st.text_input('Enter the number of questions you want to generate') if (st.button("Submit URL", on_click=callback) and url and count) : st.write("Thanks for submission !") opt = all(url, count) st.session_state.opt = opt if st.session_state.submitted and st.session_state.opt: option = st.multiselect('Select the question you want to add to database ', st.session_state.opt) if option: if st.button("Add question"): for i in range(len(option)): files = { "question": option[i].split(":")[0], "answer": option[i].split(":")[1] } response = query(link, files) st.write(response.text) main()