import streamlit as st import sentence_transformers from transformers import AutoTokenizer from youtube_transcript_api import YouTubeTranscriptApi import os import ast import pandas as pd from segmentation import SemanticTextSegmentation import re from symspellpy import SymSpell import pkg_resources from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from torch import cuda from transformers import pipeline import nltk nltk.download('stopwords') from PIL import Image from PIL import ImageDraw from PIL import ImageFont if not os.path.exists('./transcripts'): os.mkdir('./transcripts') device = 'cuda' if cuda.is_available() else 'cpu' def clean_text(link,start,end): tokenizer = AutoTokenizer.from_pretrained("t5-base") sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7) dictionary_path = pkg_resources.resource_filename( "symspellpy", "frequency_dictionary_en_82_765.txt" ) sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1) def id_ts_grabber(link): youtube_video = link.split("=") video_id = youtube_video[1] #print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""") return video_id #print(f""" This is the video ID: {video_id} and no Timestamp was found""") def seg_getter(data,ts,es): starts = [] for line in data: ccs = ast.literal_eval(line) starts.append(float(ccs['start'])) #print(starts) #ts_ = float(ts.strip("s&end")) #es_ = float(es.strip(es[-1])) if not(es) : e_val = starts[-1] else: e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))] t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(ts)))] tid = starts.index(t_val) eid = starts.index(e_val) ts_list_len = len(starts[tid:eid]) return tid, ts_list_len def get_cc(video_id): try: transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) try: # filter for manually created transcripts transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN']) except Exception as e: # print(e) transcript = None manual = True if not transcript: try: # or automatically generated ones transcript = transcript_list.find_generated_transcript(['en']) manual = False except Exception as e: # print(e) transcript = None if transcript: if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt") else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt") with open(file_name, 'w') as file: for line in transcript.fetch(): file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n') # print(f"CC downloaded in {file_name}") return file_name else: #print("No transcript found") return None except Exception as e: #print(e) return None def transcript_creator(filename,timestamp,end_pt): #print(filename) with open(filename, 'r') as f: data = f.readlines() #print("This is data: ", data) transcripts = [] #print("this is ts: ",timestamp) if not(timestamp) and not(end_pt): #print("executing 1 ") for line in data: ccs = ast.literal_eval(line) transcripts.append(ccs['text']) return transcripts elif not(timestamp) and end_pt : timestamp = 0 start,lenlist = seg_getter(data, timestamp, end_pt) for t in range(lenlist): ccs = ast.literal_eval(data[start+t]) transcripts.append(ccs['text']) return transcripts else : #print("executing 2") start,lenlist = seg_getter(data,timestamp,end_pt) #print(f""" This is the ts list{ts_len}""") for t in range(lenlist): ccs = ast.literal_eval(data[start+t]) transcripts.append(ccs['text']) return transcripts def transcript_collector(link,ts,es): vid = id_ts_grabber(link) print(f""" Fetching the transcript """) filename = get_cc(vid) return transcript_creator(filename, ts, es), vid transcript = pd.DataFrame(columns=['text', 'video_id']) transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end) def segment(corpus): text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus] text_data = [x for x in text_data if x != ''] df = pd.DataFrame(text_data, columns=["utterance"]) # remove new line, tab, return df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " ")) # remove Nan df.dropna(inplace=True) sts = SemanticTextSegmentation(df) texts = sts.get_segments() return texts sf = pd.DataFrame(columns=['Segmented_Text','video_id']) text = segment(transcript.at[0,'text']) for i in range(len(text)): sf.loc[i, 'Segmented_Text'] = text[i] sf.loc[i, 'video_id'] = transcript.at[0,'video_id'] def word_seg(text): text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ") results = sym_spell.word_segmentation(text, max_edit_distance=0) texts = results.segmented_string #result = re.sub(r'[^\w\s]', '',texts).lower() return texts for i in range(len(sf)): st.write(sf.at[i, 'Segmented_Text']) sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text']) sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids']) texts = pd.DataFrame(columns=['texts']) def segment_loader(dataframe): flag = 0 for i in range(len(dataframe)): if flag > 0: flag -= 1 continue m = 512 iter = 0 texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text'] length = dataframe.at[i+iter, 'Lengths'] texts.loc[i,'video_id'] = dataframe.at[i, 'video_id'] while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']: if length + dataframe.at[i + iter + 1, 'Lengths'] <= m : texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text'] length += dataframe.at[i+iter + 1,'Lengths'] iter += 1 else: break flag = iter return texts cleaned_text = segment_loader(sf) cleaned_text.reset_index(drop=True, inplace=True) return cleaned_text def t5_summarizer(link,start, end): input_text = clean_text(link,start,end) lst_outputs = [] tokenizer1 = AutoTokenizer.from_pretrained("CareerNinja/t5-large_3e-4") model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5-large_3e-4") summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer1) print(f""" Entered summarizer ! """) st.write('Below is the summary of the given URL: ') for i in range(len(input_text)): summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128) sumry = list(summary[0].values()) input_text.loc[i,'Generated Summary'] = sumry[0] lst_outputs.append(sumry[0]) st.write(input_text.at[i,'Generated Summary']) if i != len(input_text) - 1: st.write('=====================================================================================') return lst_outputs def card_creator(path, text, y_value): img = Image.open(path) def text_wrap(text, font, max_width): """Wrap text base on specified width. This is to enable text of width more than the image width to be display nicely. @params: text: str text to wrap font: obj font of the text max_width: int width to split the text with @return lines: list[str] list of sub-strings """ lines = [] # If the text width is smaller than the image width, then no need to split # just add it to the line list and return if font.getsize(text)[0] <= max_width: lines.append(text) else: #split the line by spaces to get words words = text.split(' ') i = 0 # append every word to a line while its width is shorter than the image width while i < len(words): line = '' while i < len(words) and font.getsize(line + words[i])[0] <= max_width: line = line + words[i]+ " " i += 1 if not line: line = words[i] i += 1 lines.append(line) return lines font_path = 'Montserrat-Regular.ttf' font = ImageFont.truetype(font=font_path, size=22) lines = text_wrap(text, font, img.size[0] - 44) line_height = font.getsize('hg')[1] draw = ImageDraw.Draw(img) #Draw text on image color = 'rgb(255,255,255)' # white color x = 22 y = y_value for line in lines: draw.text((x,y), line, fill=color, font=font) y = y + line_height # update y-axis for new line img.save("card.png") st.image(img, caption="Summary Card") def main(): if 'submitted' not in st.session_state: st.session_state.submitted = False if 'opt' not in st.session_state: st.session_state.opt = [] def callback(): st.session_state.submitted = True st.title('Video Summarizer') url = st.text_input('Enter the Video Link') start_pt = st.text_input('Enter the Start point in secs') end_pt = st.text_input('Enter the end point in secs') if (st.button("Submit URL", on_click=callback) and url) : opt = t5_summarizer(url,start_pt,end_pt) st.session_state.opt = opt #st.write(st.session_state) #text = st.text_input('Enter the Summary here to make a Summary Card.') #text = st.selectbox('Select the summary you want to creat a card of ', opt, key="text") #st.write('You selected:', option) if st.session_state.submitted and st.session_state.opt: text = st.selectbox('Select the summary you want to creat a card of ', st.session_state.opt) option = st.selectbox('Which color template would you like to use ?',('Elf Green','Dark Pastel Green')) if st.button("Generate Summary Card") and text and option: if option == 'Elf Green': if len(text) > 380 : st.error('Summary is too long !') else: card_creator('iteration5_empty.png',text,335) else : if len(text) > 430 : st.error('Summary is too long !') else : card_creator('X-93.png',text,285) with open("card.png", "rb") as file: btn = st.download_button( label="Download card", data=file, file_name="card.png", mime="image/png" ) main()