oconnoob's picture
Update app/helpers.py
867cb9d
raw
history blame contribute delete
No virus
16.3 kB
import re
import requests
import time
from scipy.io.wavfile import write
import io
import plotly.express as px
upload_endpoint = "https://api.assemblyai.com/v2/upload"
transcript_endpoint = "https://api.assemblyai.com/v2/transcript"
# Colors for sentiment analysis highlighting
green = "background-color: #159609"
red = "background-color: #cc0c0c"
# Converts Gradio checkboxes to AssemlbyAI header arguments
transcription_options_headers = {
'Automatic Language Detection': 'language_detection',
'Speaker Labels': 'speaker_labels',
'Filter Profanity': 'filter_profanity',
}
# Converts Gradio checkboxes to AssemblyAI header arguments
audio_intelligence_headers = {
'Summarization': 'auto_chapters',
'Auto Highlights': 'auto_highlights',
'Topic Detection': 'iab_categories',
'Entity Detection': 'entity_detection',
'Sentiment Analysis': 'sentiment_analysis',
'PII Redaction': 'redact_pii',
'Content Moderation': 'content_safety',
}
# Converts selected language in Gradio to language code for AssemblyAI header argument
language_headers = {
'Global English': 'en',
'US English': 'en_us',
'British English': 'en_uk',
'Australian English': 'en_au',
'Spanish': 'es',
'French': 'fr',
'German': 'de',
'Italian': 'it',
'Portuguese': 'pt',
'Dutch': 'nl',
'Hindi': 'hi',
'Japanese': 'jp',
}
def make_header(api_key):
return {
'authorization': api_key,
'content-type': 'application/json'
}
def _read_file(filename, chunk_size=5242880):
"""Helper for `upload_file()`"""
with open(filename, "rb") as f:
while True:
data = f.read(chunk_size)
if not data:
break
yield data
def _read_array(audio, chunk_size=5242880):
"""Like _read_file but for array - creates temporary unsaved "file" from sample rate and audio np.array"""
sr, aud = audio
# Create temporary "file" and write data to it
bytes_wav = bytes()
temp_file = io.BytesIO(bytes_wav)
write(temp_file, sr, aud)
while True:
data = temp_file.read(chunk_size)
if not data:
break
yield data
def upload_file(audio_file, header, is_file=True):
"""Uploads a file to AssemblyAI for analysis"""
upload_response = requests.post(
upload_endpoint,
headers=header,
data=_read_file(audio_file) if is_file else _read_array(audio_file)
)
if upload_response.status_code != 200:
upload_response.raise_for_status()
# Returns {'upload_url': <URL>}
return upload_response.json()
def request_transcript(upload_url, header, **kwargs):
"""Request a transcript/audio analysis from AssemblyAI"""
# If input is a dict returned from `upload_file` rather than a raw upload_url string
if type(upload_url) is dict:
upload_url = upload_url['upload_url']
# Create request
transcript_request = {
'audio_url': upload_url,
**kwargs
}
# POST request
transcript_response = requests.post(
transcript_endpoint,
json=transcript_request,
headers=header
)
return transcript_response.json()
def make_polling_endpoint(transcript_id):
"""Create a polling endpoint from a transcript ID to check on the status of the transcript"""
# If upload response is input rather than raw upload_url string
if type(transcript_id) is dict:
transcript_id = transcript_id['id']
polling_endpoint = "https://api.assemblyai.com/v2/transcript/" + transcript_id
return polling_endpoint
def wait_for_completion(polling_endpoint, header):
"""Given a polling endpoint, waits for the transcription/audio analysis to complete"""
while True:
polling_response = requests.get(polling_endpoint, headers=header)
polling_response = polling_response.json()
if polling_response['status'] == 'completed':
break
elif polling_response['status'] == 'error':
raise Exception(f"Error: {polling_response['error']}")
time.sleep(5)
def make_true_dict(transcription_options, audio_intelligence_selector):
"""Given transcription / audio intelligence Gradio options, create a dictionary to be used in AssemblyAI request"""
# Convert Gradio checkbox names to AssemblyAI API keys
aai_tran_keys = [transcription_options_headers[elt] for elt in transcription_options]
aai_audint_keys = [audio_intelligence_headers[elt] for elt in audio_intelligence_selector]
# For each checked box, set it to true in the JSON used POST request to AssemblyAI
aai_tran_dict = {key: 'true' for key in aai_tran_keys}
aai_audint_dict = {key: 'true' for key in aai_audint_keys}
return {**aai_tran_dict, **aai_audint_dict}
def make_final_json(true_dict, language):
"""Takes in output of `make_true_dict()` and adds all required other key-value pairs"""
# If automatic language detection selected but no language specified, default to US english
if 'language_detection' not in true_dict:
if language is None:
language = "US English"
true_dict = {**true_dict, 'language_code': language_headers[language]}
# If PII Redaction is enabled, add default redaction policies
if 'redact_pii' in true_dict:
true_dict = {**true_dict, 'redact_pii_policies': ['drug', 'injury', 'person_name', 'money_amount']}
return true_dict, language
def _split_on_capital(string):
"""Adds spaces between capitalized words of a string via regex. 'HereAreSomeWords' -> 'Here Are Some Words'"""
return ' '.join(re.findall("[A-Z][^A-Z]*", string))
def _make_tree(c, ukey=''):
'''
Given a list whose elements are nested topic lists, generates a JSON-esque dictionary tree of topics and
subtopics
E.g. the input
[
['Education', 'CollegeEducation', 'PostgraduateEducation'],
['Education', 'CollegeEducation', 'UndergraduateEducation']
]
Would output a dictionary corresponding to a tree with two leaves, 'UndergraduateEducation' and
'PostgraduateEducation', which fall under a node 'CollegeEducation' which in turn falls under the node 'Education'
:param c: List of topics
:param ukey: "Upper key". For recursion - name of upper level key whose value (list) is being recursed on
:return: Dictionary that defines a tree structure
'''
# Create empty dict for current sublist
d = dict()
# If leaf, return None
if c is None and ukey is None:
return None
elif c is None:
return {None: None}
else:
# For each elt of the input (itself a list),
for n, i in enumerate(c):
# For topics with sublist e.g. if ['NewsAndPolitics' 'Politics'] and
# ['NewsAndPolitics' 'Politics', 'Elections'] are both in list - need way to signify politics itself
# included
if i is None:
d[None] = None
# If next subtopic not in dict, add it. If the remaining list empty, make value None
elif i[0] not in d.keys():
topic = i.pop(0)
d[topic] = None if i == [] else [i]
# If subtopic already in dict
else:
# If the value for this subtopic is only None (i.e. subject itself is a leaf), then append sublist
if d[i[0]] is None:
d[i[0]] = [None, i[1:]]
# If value for this subtopic is a list itself, then append the remaining list
else:
d[i[0]].append(i[1:])
# Recurse on remaining leaves
for key in d:
d[key] = _make_tree(d[key], key)
return d
def _make_html_tree(dic, level=0, HTML=''):
"""Generates an HTML tree from an output of _make_tree"""
HTML += "<ul>"
for key in dic:
# Add the topic to HTML, specifying the current level and whether it is a topic
if type(dic[key]) == dict:
HTML += "<li>"
if None in dic[key].keys():
del dic[key][None]
HTML += f'<p class="topic-L{level} istopic">{_split_on_capital(key)}</p>'
else:
HTML += f'<p class="topic-L{level}">{_split_on_capital(key)}</p>'
HTML += "</li>"
HTML = _make_html_tree(dic[key], level=level + 1, HTML=HTML)
else:
HTML += "<li>"
HTML += f'<p class="topic-L{level} istopic">{_split_on_capital(key)}</p>'
HTML += "</li>"
HTML += "</ul>"
return HTML
def _make_html_body(dic):
"""Makes an HTML body from an output of _make_tree"""
HTML = '<body>'
HTML += _make_html_tree(dic)
HTML += "</body>"
return HTML
def _make_html(dic):
"""Makes a full HTML document from an output of _make_tree using styles.css styling"""
HTML = '<!DOCTYPE html>' \
'<html>' \
'<head>' \
'<title>Another simple example</title>' \
'<link rel="stylesheet" type="text/css" href="styles.css"/>' \
'</head>'
HTML += _make_html_body(dic)
HTML += "</html>"
return HTML
# make_html_from_topics(j['iab_categories_result']['summary'])
def make_html_from_topics(dic, threshold=0.0):
"""Given a topics dictionary from AAI Topic Detection API, generates appropriate corresponding structured HTML.
Input is `response.json()['iab_categories_result']['summary']` from GET request on AssemblyAI `v2/transcript`
endpoint."""
# Potentially filter some items out
cats = [k for k, v in dic.items() if float(v) >= threshold]
# Sort remaining topics
cats.sort()
# Split items into lists
cats = [i.split(">") for i in cats]
# Make topic tree
tree = _make_tree(cats)
# Return formatted HTML
return _make_html(tree)
def make_paras_string(transc_id, header):
""" Makes a string by concatenating paragraphs newlines in between. Input is response.json()['paragraphs'] from
from AssemblyAI paragraphs endpoint """
endpoint = transcript_endpoint + "/" + transc_id + "/paragraphs"
paras = requests.get(endpoint, headers=header).json()['paragraphs']
paras = '\n\n'.join(i['text'] for i in paras)
return paras
def create_highlighted_list(paragraphs_string, highlights_result, rank=0):
"""Outputs auto highlights information in appropriate format for `gr.HighlightedText()`. `highlights_result` is
response.json()['auto_highlights_result]['results'] where response from GET request on AssemblyAI v2/transcript
endpoint"""
# Max and min opacities to highlight to
MAX_HIGHLIGHT = 1 # Max allowed = 1
MIN_HIGHLIGHT = 0.25 # Min allowed = 0
# Filter list for everything above the input rank
highlights_result = [i for i in highlights_result if i['rank'] >= rank]
# Get max/min ranks and find scale/shift we'll need so ranks are mapped to [MIN_HIGHLIGHT, MAX_HIGHLIGHT]
max_rank = max([i['rank'] for i in highlights_result])
min_rank = min([i['rank'] for i in highlights_result])
scale = (MAX_HIGHLIGHT - MIN_HIGHLIGHT) / (max_rank - min_rank)
shift = (MAX_HIGHLIGHT - max_rank * scale)
# Isolate only highlight text and rank
highlights_result = [(i['text'], i['rank']) for i in highlights_result]
entities = []
for highlight, rank in highlights_result:
# For each highlight, find all starting character instances
starts = [c.start() for c in re.finditer(highlight, paragraphs_string)]
# Create list of locations for this highlight with entity value (highlight opacity) scaled properly
e = [{"entity": rank * scale + shift,
"start": start,
"end": start + len(highlight)}
for start in starts]
entities += e
# Create dictionary
highlight_dict = {"text": paragraphs_string, "entities": entities}
# Sort entities by start char. A bug in Gradio requires this
highlight_dict['entities'] = sorted(highlight_dict['entities'], key=lambda x: x['start'])
return highlight_dict
def make_summary(chapters):
"""Makes HTML for "Summary" `gr.Tab()` tab. Input is `response.json()['chapters']` where response is from GET
request to AssemblyAI's v2/transcript endpoint"""
html = "<div>"
for chapter in chapters:
html += "<details>" \
f"<summary><b>{chapter['headline']}</b></summary>" \
f"{chapter['summary']}" \
"</details>"
html += "</div>"
return html
def to_hex(num, max_opacity=128):
"""Converts a confidence value in the range [0, 1] to a hex value"""
return hex(int(max_opacity * num))[2:]
def make_sentiment_output(sentiment_analysis_results):
"""Makes HTML output of sentiment analysis info for display with `gr.HTML()`. Input is
`response.json()['sentiment_analysis_results']` from GET request on AssemblyAI v2/transcript."""
p = '<p>'
for sentiment in sentiment_analysis_results:
if sentiment['sentiment'] == 'POSITIVE':
p += f'<mark style="{green + to_hex(sentiment["confidence"])}">' + sentiment['text'] + '</mark> '
elif sentiment['sentiment'] == "NEGATIVE":
p += f'<mark style="{red + to_hex(sentiment["confidence"])}">' + sentiment['text'] + '</mark> '
else:
p += sentiment['text'] + ' '
p += "</p>"
return p
def make_entity_dict(entities, t, offset=40):
"""Creates dictionary that will be used to generate HTML for Entity Detection `gr.Tab()` tab.
Inputs are response.json()['entities'] and response.json()['text'] for response of GET request
on AssemblyAI v2/transcript endpoint"""
len_text = len(t)
d = {}
for entity in entities:
# Find entity in the text
s = t.find(entity['text'])
if s == -1:
p = None
else:
len_entity = len(entity['text'])
# Get entity context (colloquial sense)
p = t[max(0, s - offset):min(s + len_entity + offset, len_text)]
# Make sure start and end with a full word
p = '... ' + ' '.join(p.split(' ')[1:-1]) + ' ...'
# Add to dict
label = ' '.join(entity['entity_type'].split('_')).title()
if label in d:
d[label] += [[p, entity['text']]]
else:
d[label] = [[p, entity['text']]]
return d
def make_entity_html(d, highlight_color="#FFFF0080"):
"""Input is output of `make_entity_dict`. Creates HTML for Entity Detection info"""
h = "<ul>"
for i in d:
h += f"""<li style="color: #6b2bd6; font-size: 20px;">{i}"""
h += "<ul>"
for sent, ent in d[i]:
if sent is None:
h += f"""<li style="color: black; font-size: 16px;">[REDACTED]</li>"""
else:
h += f"""<li style="color: black; font-size: 16px;">{sent.replace(ent, f'<mark style="background-color: {highlight_color}">{ent}</mark>')}</li>"""
h += '</ul>'
h += '</li>'
h += "</ul>"
return h
def make_content_safety_fig(cont_safety_summary):
"""Creates content safety figure from response.json()['content_safety_labels']['summary'] from GET request on
AssemblyAI v2/transcript endpoint"""
# Create dictionary as demanded by plotly
d = {'label': [], 'severity': [], 'color': []}
# For each sentitive topic, add the (formatted) name, severity, and plot color
for key in cont_safety_summary:
d['label'] += [' '.join(key.split('_')).title()]
d['severity'] += [cont_safety_summary[key]]
d['color'] += ['rgba(107, 43, 214, 1)']
# Create the figure (n.b. repetitive color info but was running into plotly bugs)
content_fig = px.bar(d, x='severity', y='label', color='color', color_discrete_map={
'Crime Violence': 'rgba(107, 43, 214, 1)',
'Alcohol': 'rgba(107, 43, 214, 0.1)',
'Accidents': 'rgba(107, 43, 214, 0.1)'})
# Update the content figure plot
content_fig.update_layout({'plot_bgcolor': 'rgba(107, 43, 214, 0.1)'})
#content_fig.update(showlegend=False)
# Scales axes appropriately
content_fig.update_xaxes(range=[0, 1])
return content_fig