import gradio as gr import tensorflow as tf import re import string from tokenizers import Tokenizer import numpy as np hind_tokenizer = Tokenizer.from_file("hind_tokenizer.json") eng_tokenizer = Tokenizer.from_file("eng_tokenizer.json") def clean_english_text(text): # Remove special characters and digits text = re.sub(r"[^a-zA-Z\s]", "", text) # Convert to lowercase text = text.lower() # Remove punctuation text = text.translate(str.maketrans("", "", string.punctuation)) # Remove extra whitespace and strip text = re.sub(r"\s+", " ", text).strip() return text max_sequence_length = 50 # Encode a Hindi sentence into token IDs and pad the sequence def encode_and_pad(sentence): encoding = eng_tokenizer.encode(sentence) encoded_ids = encoding.ids[:max_sequence_length] padding_length = max_sequence_length - len(encoded_ids) attention_mask = [1]*len(encoded_ids) + [0] * padding_length padded_ids = encoded_ids + [0] * padding_length return padded_ids, attention_mask def positional_encoding(length, depth): depth = depth/2 positions = np.arange(length)[:, np.newaxis] # (seq, 1) depths = np.arange(depth)[np.newaxis, :]/depth # (1, depth) angle_rates = 1 / (10000**depths) # (1, depth) angle_rads = positions * angle_rates # (pos, depth) pos_encoding = np.concatenate( [np.sin(angle_rads), np.cos(angle_rads)], axis=-1) return tf.cast(pos_encoding, dtype=tf.float32) class PositionalEmbedding(tf.keras.layers.Layer): def __init__(self, vocab_size, d_model): super().__init__() self.d_model = d_model self.embedding = tf.keras.layers.Embedding(vocab_size, d_model, mask_zero=True) self.pos_encoding = positional_encoding(length=2048, depth=d_model) def compute_mask(self, *args, **kwargs): return self.embedding.compute_mask(*args, **kwargs) def call(self, x): length = tf.shape(x)[1] x = self.embedding(x) # This factor sets the relative scale of the embedding and positonal_encoding. x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) x = x + self.pos_encoding[tf.newaxis, :length, :] return x class BaseAttention(tf.keras.layers.Layer): def __init__(self, **kwargs): super().__init__() self.mha = tf.keras.layers.MultiHeadAttention(**kwargs) self.layernorm = tf.keras.layers.LayerNormalization() self.add = tf.keras.layers.Add() class CrossAttention(BaseAttention): def call(self, x, context): attn_output, attn_scores = self.mha( query=x, key=context, value=context, return_attention_scores=True) # Cache the attention scores for plotting later. self.last_attn_scores = attn_scores x = self.add([x, attn_output]) x = self.layernorm(x) return x class GlobalSelfAttention(BaseAttention): def call(self, x): attn_output = self.mha( query=x, value=x, key=x) x = self.add([x, attn_output]) x = self.layernorm(x) return x class CausalSelfAttention(BaseAttention): def call(self, x): attn_output = self.mha( query=x, value=x, key=x, use_causal_mask = True) x = self.add([x, attn_output]) x = self.layernorm(x) return x class FeedForward(tf.keras.layers.Layer): def __init__(self, d_model, dff, dropout_rate=0.1): super().__init__() self.seq = tf.keras.Sequential([ tf.keras.layers.Dense(dff, activation='relu'), tf.keras.layers.Dense(d_model), tf.keras.layers.Dropout(dropout_rate) ]) self.add = tf.keras.layers.Add() self.layer_norm = tf.keras.layers.LayerNormalization() def call(self, x): x = self.add([x, self.seq(x)]) x = self.layer_norm(x) return x class EncoderLayer(tf.keras.layers.Layer): def __init__(self,*, d_model, num_heads, dff, dropout_rate=0.1): super().__init__() self.self_attention = GlobalSelfAttention( num_heads=num_heads, key_dim=d_model, dropout=dropout_rate) self.ffn = FeedForward(d_model, dff) def call(self, x): x = self.self_attention(x) x = self.ffn(x) return x class Encoder(tf.keras.layers.Layer): def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1): super().__init__() self.d_model = d_model self.num_layers = num_layers self.pos_embedding = PositionalEmbedding( vocab_size=vocab_size, d_model=d_model) self.enc_layers = [ EncoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate) for _ in range(num_layers)] self.dropout = tf.keras.layers.Dropout(dropout_rate) def call(self, x): # `x` is token-IDs shape: (batch, seq_len) x = self.pos_embedding(x) # Shape `(batch_size, seq_len, d_model)`. # Add dropout. x = self.dropout(x) for i in range(self.num_layers): x = self.enc_layers[i](x) return x # Shape `(batch_size, seq_len, d_model)`. class DecoderLayer(tf.keras.layers.Layer): def __init__(self, *, d_model, num_heads, dff, dropout_rate=0.1): super(DecoderLayer, self).__init__() self.causal_self_attention = CausalSelfAttention( num_heads=num_heads, key_dim=d_model, dropout=dropout_rate) self.cross_attention = CrossAttention( num_heads=num_heads, key_dim=d_model, dropout=dropout_rate) self.ffn = FeedForward(d_model, dff) def call(self, x, context): x = self.causal_self_attention(x=x) x = self.cross_attention(x=x, context=context) # Cache the last attention scores for plotting later self.last_attn_scores = self.cross_attention.last_attn_scores x = self.ffn(x) # Shape `(batch_size, seq_len, d_model)`. return x class Decoder(tf.keras.layers.Layer): def __init__(self, *, num_layers, d_model, num_heads, dff, vocab_size, dropout_rate=0.1): super(Decoder, self).__init__() self.d_model = d_model self.num_layers = num_layers self.pos_embedding = PositionalEmbedding(vocab_size=vocab_size, d_model=d_model) self.dropout = tf.keras.layers.Dropout(dropout_rate) self.dec_layers = [ DecoderLayer(d_model=d_model, num_heads=num_heads, dff=dff, dropout_rate=dropout_rate) for _ in range(num_layers)] self.last_attn_scores = None def call(self, x, context): # `x` is token-IDs shape (batch, target_seq_len) x = self.pos_embedding(x) # (batch_size, target_seq_len, d_model) x = self.dropout(x) for i in range(self.num_layers): x = self.dec_layers[i](x, context) self.last_attn_scores = self.dec_layers[-1].last_attn_scores # The shape of x is (batch_size, target_seq_len, d_model). return x class Transformer(tf.keras.Model): def __init__(self, *, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, dropout_rate=0.1): super().__init__() self.encoder = Encoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, vocab_size=input_vocab_size, dropout_rate=dropout_rate) self.decoder = Decoder(num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, vocab_size=target_vocab_size, dropout_rate=dropout_rate) self.final_layer = tf.keras.layers.Dense(target_vocab_size) def call(self, inputs): # To use a Keras model with `.fit` you must pass all your inputs in the # first argument. context, x = inputs context = self.encoder(context) # (batch_size, context_len, d_model) x = self.decoder(x, context) # (batch_size, target_len, d_model) # Final linear layer output. logits = self.final_layer(x) # (batch_size, target_len, target_vocab_size) try: # Drop the keras mask, so it doesn't scale the losses/metrics. # b/250038731 del logits._keras_mask except AttributeError: pass # Return the final output and the attention weights. return logits class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule): def __init__(self, d_model, warmup_steps=4000): super().__init__() self.d_model = d_model self.d_model = tf.cast(self.d_model, tf.float32) self.warmup_steps = warmup_steps def __call__(self, step): step = tf.cast(step, dtype=tf.float32) arg1 = tf.math.rsqrt(step) arg2 = step * (self.warmup_steps ** -1.5) return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2) num_layers = 6 d_model = 512 dff = 512 num_heads = 12 dropout_rate = 0.1 def masked_loss(label, pred): mask = label != 0 loss_object = tf.keras.losses.SparseCategoricalCrossentropy( from_logits=True, reduction='none') loss = loss_object(label, pred) mask = tf.cast(mask, dtype=loss.dtype) loss *= mask loss = tf.reduce_sum(loss)/tf.reduce_sum(mask) return loss def masked_accuracy(label, pred): pred = tf.argmax(pred, axis=2) label = tf.cast(label, pred.dtype) match = label == pred mask = label != 0 match = match & mask match = tf.cast(match, dtype=tf.float32) mask = tf.cast(mask, dtype=tf.float32) return tf.reduce_sum(match)/tf.reduce_sum(mask) transformer = Transformer( num_layers=num_layers, d_model=d_model, num_heads=num_heads, dff=dff, input_vocab_size=eng_tokenizer.get_vocab_size(), target_vocab_size=hind_tokenizer.get_vocab_size(), dropout_rate=dropout_rate) learning_rate = CustomSchedule(d_model) optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9) transformer.compile( loss=masked_loss, optimizer=optimizer, metrics=[masked_accuracy]) transformer.load_weights("best_weights_6_512_512") class Translator(tf.Module): def __init__(self, eng_tokenizer,hind_tokenizer, transformer): self.eng_tokenizer = eng_tokenizer self.hind_tokenizer = hind_tokenizer self.transformer = transformer def __call__(self, sentence, max_length=50): # sentence = clean_english_text(sentence) sentence = tf.reshape(tf.convert_to_tensor(self.eng_tokenizer.encode(sentence).ids+[0]*(50-len(self.eng_tokenizer.encode(sentence).ids))),(1, 50)) encoder_input = sentence # As the output language is English, initialize the output with the # English `[START]` token. start = self.hind_tokenizer.encode("").ids[0] end = self.hind_tokenizer.encode("").ids[0] output_array = [[start]] for i in tf.range(max_length): predictions = self.transformer([encoder_input, tf.convert_to_tensor(output_array)], training=False) # Select the last token from the `seq_len` dimension. predictions = predictions[:, -1:, :] # Shape `(batch_size, 1, vocab_size)`. predicted_id = tf.argmax(predictions, axis=-1) # Concatenate the `predicted_id` to the output which is given to the # decoder as its input. output_array[0].append(predicted_id[0].numpy()[0]) if predicted_id == end: break return self.hind_tokenizer.decode(output_array[0]) translator = Translator(eng_tokenizer, hind_tokenizer, transformer) # Function to perform the model's inference def text_transform(input_text): # Your machine learning model's inference code here # Example: return input_text in uppercase return ' '.join(translator(clean_english_text(input_text)).split()[1:-1]) # Create a Gradio interface iface = gr.Interface( fn=text_transform, # Function to perform the inference inputs="text", # Specify input type as text outputs="text" # Specify output type as text ) # Start the Gradio interface iface.launch()