Spaces:
Build error
Build error
import gradio as gr | |
import tensorflow as tf | |
import re | |
import string | |
from tokenizers import Tokenizer | |
import numpy as np | |
hind_tokenizer = Tokenizer.from_file("hind_tokenizer.json") | |
eng_tokenizer = Tokenizer.from_file("eng_tokenizer.json") | |
def clean_english_text(text): | |
# Remove special characters and digits | |
text = re.sub(r"[^a-zA-Z\s]", "", text) | |
# Convert to lowercase | |
text = text.lower() | |
# Remove punctuation | |
text = text.translate(str.maketrans("", "", string.punctuation)) | |
# Remove extra whitespace and strip | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
max_sequence_length = 50 | |
# Encode a Hindi sentence into token IDs and pad the sequence | |
def encode_and_pad(sentence): | |
encoding = eng_tokenizer.encode(sentence) | |
encoded_ids = encoding.ids[:max_sequence_length] | |
padding_length = max_sequence_length - len(encoded_ids) | |
attention_mask = [1]*len(encoded_ids) + [0] * padding_length | |
padded_ids = encoded_ids + [0] * padding_length | |
return padded_ids, attention_mask | |
def positional_encoding(length, depth): | |
depth = depth/2 | |
positions = np.arange(length)[:, np.newaxis] # (seq, 1) | |
depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth) | |
angle_rates = 1 / (10000**depths) # (1, depth) | |
angle_rads = positions * angle_rates # (pos, depth) | |
pos_encoding = np.concatenate( | |
[np.sin(angle_rads), np.cos(angle_rads)], | |
axis=-1) | |
return tf.cast(pos_encoding, dtype=tf.float32) | |
class PositionalEmbedding(tf.keras.layers.Layer): | |
def __init__(self, vocab_size, d_model): | |
super().__init__() | |
self.d_model = d_model | |
self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) | |
self.pos_encoding = positional_encoding(length=2048, depth=d_model) | |
def compute_mask(self, *args, **kwargs): | |
return self.embedding.compute_mask(*args, **kwargs) | |
def call(self, x): | |
length = tf.shape(x)[1] | |
x = self.embedding(x) | |
# This factor sets the relative scale of the embedding and positonal_encoding. | |
x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) | |
x = x + self.pos_encoding[tf.newaxis, :length, :] | |
return x | |
class BaseAttention(tf.keras.layers.Layer): | |
def __init__(self, **kwargs): | |
super().__init__() | |
self.mha = tf.keras.layers.MultiHeadAttention(**kwargs) | |
self.layernorm = tf.keras.layers.LayerNormalization() | |
self.add = tf.keras.layers.Add() | |
class CrossAttention(BaseAttention): | |
def call(self, x, context): | |
attn_output, attn_scores = self.mha( | |
query=x, | |
key=context, | |
value=context, | |
return_attention_scores=True) | |
# Cache the attention scores for plotting later. | |
self.last_attn_scores = attn_scores | |
x = self.add([x, attn_output]) | |
x = self.layernorm(x) | |
return x | |
class GlobalSelfAttention(BaseAttention): | |
def call(self, x): | |
attn_output = self.mha( | |
query=x, | |
value=x, | |
key=x) | |
x = self.add([x, attn_output]) | |
x = self.layernorm(x) | |
return x | |
class CausalSelfAttention(BaseAttention): | |
def call(self, x): | |
attn_output = self.mha( | |
query=x, | |
value=x, | |
key=x, | |
use_causal_mask = True) | |
x = self.add([x, attn_output]) | |
x = self.layernorm(x) | |
return x | |
class FeedForward(tf.keras.layers.Layer): | |
def __init__(self, d_model, dff, dropout_rate=0.1): | |
super().__init__() | |
self.seq = tf.keras.Sequential([ | |
tf.keras.layers.Dense(dff, activation='relu'), | |
tf.keras.layers.Dense(d_model), | |
tf.keras.layers.Dropout(dropout_rate) | |
]) | |
self.add = tf.keras.layers.Add() | |
self.layer_norm = tf.keras.layers.LayerNormalization() | |
def call(self, x): | |
x = self.add([x, self.seq(x)]) | |
x = self.layer_norm(x) | |
return x | |
class EncoderLayer(tf.keras.layers.Layer): | |
def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1): | |
super().__init__() | |
self.self_attention = GlobalSelfAttention( | |
num_heads=num_heads, | |
key_dim=d_model, | |
dropout=dropout_rate) | |
self.ffn = FeedForward(d_model, dff) | |
def call(self, x): | |
x = self.self_attention(x) | |
x = self.ffn(x) | |
return x | |
class Encoder(tf.keras.layers.Layer): | |
def __init__(self, *, num_layers, d_model, num_heads, | |
dff, vocab_size, dropout_rate=0.1): | |
super().__init__() | |
self.d_model = d_model | |
self.num_layers = num_layers | |
self.pos_embedding = PositionalEmbedding( | |
vocab_size=vocab_size, d_model=d_model) | |
self.enc_layers = [ | |
EncoderLayer(d_model=d_model, | |
num_heads=num_heads, | |
dff=dff, | |
dropout_rate=dropout_rate) | |
for _ in range(num_layers)] | |
self.dropout = tf.keras.layers.Dropout(dropout_rate) | |
def call(self, x): | |
# `x` is token-IDs shape: (batch, seq_len) | |
x = self.pos_embedding(x) # Shape `(batch_size, seq_len, d_model)`. | |
# Add dropout. | |
x = self.dropout(x) | |
for i in range(self.num_layers): | |
x = self.enc_layers[i](x) | |
return x # Shape `(batch_size, seq_len, d_model)`. | |
class DecoderLayer(tf.keras.layers.Layer): | |
def __init__(self, | |
*, | |
d_model, | |
num_heads, | |
dff, | |
dropout_rate=0.1): | |
super(DecoderLayer, self).__init__() | |
self.causal_self_attention = CausalSelfAttention( | |
num_heads=num_heads, | |
key_dim=d_model, | |
dropout=dropout_rate) | |
self.cross_attention = CrossAttention( | |
num_heads=num_heads, | |
key_dim=d_model, | |
dropout=dropout_rate) | |
self.ffn = FeedForward(d_model, dff) | |
def call(self, x, context): | |
x = self.causal_self_attention(x=x) | |
x = self.cross_attention(x=x, context=context) | |
# Cache the last attention scores for plotting later | |
self.last_attn_scores = self.cross_attention.last_attn_scores | |
x = self.ffn(x) # Shape `(batch_size, seq_len, d_model)`. | |
return x | |
class Decoder(tf.keras.layers.Layer): | |
def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, | |
dropout_rate=0.1): | |
super(Decoder, self).__init__() | |
self.d_model = d_model | |
self.num_layers = num_layers | |
self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, | |
d_model=d_model) | |
self.dropout = tf.keras.layers.Dropout(dropout_rate) | |
self.dec_layers = [ | |
DecoderLayer(d_model=d_model, num_heads=num_heads, | |
dff=dff, dropout_rate=dropout_rate) | |
for _ in range(num_layers)] | |
self.last_attn_scores = None | |
def call(self, x, context): | |
# `x` is token-IDs shape (batch, target_seq_len) | |
x = self.pos_embedding(x) # (batch_size, target_seq_len, d_model) | |
x = self.dropout(x) | |
for i in range(self.num_layers): | |
x = self.dec_layers[i](x, context) | |
self.last_attn_scores = self.dec_layers[-1].last_attn_scores | |
# The shape of x is (batch_size, target_seq_len, d_model). | |
return x | |
class Transformer(tf.keras.Model): | |
def __init__(self, *, num_layers, d_model, num_heads, dff, | |
input_vocab_size, target_vocab_size, dropout_rate=0.1): | |
super().__init__() | |
self.encoder = Encoder(num_layers=num_layers, d_model=d_model, | |
num_heads=num_heads, dff=dff, | |
vocab_size=input_vocab_size, | |
dropout_rate=dropout_rate) | |
self.decoder = Decoder(num_layers=num_layers, d_model=d_model, | |
num_heads=num_heads, dff=dff, | |
vocab_size=target_vocab_size, | |
dropout_rate=dropout_rate) | |
self.final_layer = tf.keras.layers.Dense(target_vocab_size) | |
def call(self, inputs): | |
# To use a Keras model with `.fit` you must pass all your inputs in the | |
# first argument. | |
context, x = inputs | |
context = self.encoder(context) # (batch_size, context_len, d_model) | |
x = self.decoder(x, context) # (batch_size, target_len, d_model) | |
# Final linear layer output. | |
logits = self.final_layer(x) # (batch_size, target_len, target_vocab_size) | |
try: | |
# Drop the keras mask, so it doesn't scale the losses/metrics. | |
# b/250038731 | |
del logits._keras_mask | |
except AttributeError: | |
pass | |
# Return the final output and the attention weights. | |
return logits | |
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule): | |
def __init__(self, d_model, warmup_steps=4000): | |
super().__init__() | |
self.d_model = d_model | |
self.d_model = tf.cast(self.d_model, tf.float32) | |
self.warmup_steps = warmup_steps | |
def __call__(self, step): | |
step = tf.cast(step, dtype=tf.float32) | |
arg1 = tf.math.rsqrt(step) | |
arg2 = step * (self.warmup_steps ** -1.5) | |
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2) | |
num_layers = 6 | |
d_model = 512 | |
dff = 512 | |
num_heads = 12 | |
dropout_rate = 0.1 | |
def masked_loss(label, pred): | |
mask = label != 0 | |
loss_object = tf.keras.losses.SparseCategoricalCrossentropy( | |
from_logits=True, reduction='none') | |
loss = loss_object(label, pred) | |
mask = tf.cast(mask, dtype=loss.dtype) | |
loss *= mask | |
loss = tf.reduce_sum(loss)/tf.reduce_sum(mask) | |
return loss | |
def masked_accuracy(label, pred): | |
pred = tf.argmax(pred, axis=2) | |
label = tf.cast(label, pred.dtype) | |
match = label == pred | |
mask = label != 0 | |
match = match & mask | |
match = tf.cast(match, dtype=tf.float32) | |
mask = tf.cast(mask, dtype=tf.float32) | |
return tf.reduce_sum(match)/tf.reduce_sum(mask) | |
transformer = Transformer( | |
num_layers=num_layers, | |
d_model=d_model, | |
num_heads=num_heads, | |
dff=dff, | |
input_vocab_size=eng_tokenizer.get_vocab_size(), | |
target_vocab_size=hind_tokenizer.get_vocab_size(), | |
dropout_rate=dropout_rate) | |
learning_rate = CustomSchedule(d_model) | |
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, | |
epsilon=1e-9) | |
transformer.compile( | |
loss=masked_loss, | |
optimizer=optimizer, | |
metrics=[masked_accuracy]) | |
transformer.load_weights("best_weights_6_512_512") | |
class Translator(tf.Module): | |
def __init__(self, eng_tokenizer,hind_tokenizer, transformer): | |
self.eng_tokenizer = eng_tokenizer | |
self.hind_tokenizer = hind_tokenizer | |
self.transformer = transformer | |
def __call__(self, sentence, max_length=50): | |
# sentence = clean_english_text(sentence) | |
sentence = tf.reshape(tf.convert_to_tensor(self.eng_tokenizer.encode(sentence).ids+[0]*(50-len(self.eng_tokenizer.encode(sentence).ids))),(1, 50)) | |
encoder_input = sentence | |
# As the output language is English, initialize the output with the | |
# English `[START]` token. | |
start = self.hind_tokenizer.encode("<START>").ids[0] | |
end = self.hind_tokenizer.encode("<END>").ids[0] | |
output_array = [[start]] | |
for i in tf.range(max_length): | |
predictions = self.transformer([encoder_input, tf.convert_to_tensor(output_array)], training=False) | |
# Select the last token from the `seq_len` dimension. | |
predictions = predictions[:, -1:, :] # Shape `(batch_size, 1, vocab_size)`. | |
predicted_id = tf.argmax(predictions, axis=-1) | |
# Concatenate the `predicted_id` to the output which is given to the | |
# decoder as its input. | |
output_array[0].append(predicted_id[0].numpy()[0]) | |
if predicted_id == end: | |
break | |
return self.hind_tokenizer.decode(output_array[0]) | |
translator = Translator(eng_tokenizer, hind_tokenizer, transformer) | |
# Function to perform the model's inference | |
def text_transform(input_text): | |
# Your machine learning model's inference code here | |
# Example: return input_text in uppercase | |
return ' '.join(translator(clean_english_text(input_text)).split()[1:-1]) | |
# Create a Gradio interface | |
iface = gr.Interface( | |
fn=text_transform, # Function to perform the inference | |
inputs="text", # Specify input type as text | |
outputs="text" # Specify output type as text | |
) | |
# Start the Gradio interface | |
iface.launch() | |