#!/usr/bin/env python # coding: utf-8 #### IMPORTING PACKAGES ###### import pandas as pd import re, string import nltk from nltk.corpus import stopwords from nltk.tokenize import word_tokenize from nltk.stem import SnowballStemmer from nltk.corpus import wordnet from nltk.stem import WordNetLemmatizer nltk.download('punkt') nltk.download('averaged_perceptron_tagger') nltk.download('wordnet') # for model-building from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.tree import DecisionTreeClassifier from sklearn.neighbors import KNeighborsClassifier from xgboost import XGBClassifier from sklearn.naive_bayes import MultinomialNB from sklearn.metrics import classification_report, roc_curve,roc_auc_score, confusion_matrix from sklearn.pipeline import Pipeline # bag of words from sklearn.feature_extraction.text import TfidfVectorizer from imblearn.over_sampling import SMOTE import plotly.express as px import plotly.graph_objects as go ############################################################# # ## PRE-PROCESSING # 1. Common text preprocessing # text = " This is a message to be cleaned. It may involve some things like:
, ?, :, '' adjacent spaces and tabs . " # convert to lowercase and remove punctuations and characters and then strip def preprocess(text): text = str(text) text = text.lower() # lowercase text text = text.strip() # get rid of leading/trailing whitespace text = re.compile('<.*?>').sub('', text) # Remove HTML tags/markups text = re.compile('[%s]' % re.escape(string.punctuation)).sub(' ', text) # Replace punctuation with space. Careful since punctuation can sometime be useful text = re.sub('\s+', ' ', text) # Remove extra space and tabs text = re.sub(r'\[[0-9]*\]', ' ', text) # [0-9] matches any digit (0 to 10000...) text = re.sub(r'[^\w\s]', '', str(text).lower().strip()) text = re.sub(r'\d', ' ', text) # matches any digit from 0 to 100000..., \D matches non-digits text = re.sub(r'\s+', ' ', text) # \s matches any whitespace, \s+ matches multiple whitespace, \S matches non-whitespace return text # 1. STOPWORD REMOVAL def stopword(string): a = [i for i in string.split() if i not in stopwords.words('english')] return ' '.join(a) # 2. STEMMING # Initialize the stemmer snow = SnowballStemmer('english') def stemming(string): a = [snow.stem(i) for i in word_tokenize(string)] return " ".join(a) # 3. LEMMATIZATION # Initialize the lemmatizer wl = WordNetLemmatizer() # This is a helper function to map NTLK position tags # Full list is available here: https://www.ling.upenn.edu/courses/Fall_2003/ling001/penn_treebank_pos.html def get_wordnet_pos(tag): if tag.startswith('J'): return wordnet.ADJ elif tag.startswith('V'): return wordnet.VERB elif tag.startswith('N'): return wordnet.NOUN elif tag.startswith('R'): return wordnet.ADV else: return wordnet.NOUN # Tokenize the sentence def lemmatizer(string): word_pos_tags = nltk.pos_tag(word_tokenize(string)) # Get position tags a = [wl.lemmatize(tag[0], get_wordnet_pos(tag[1])) for idx, tag in enumerate(word_pos_tags)] # Map the position tag and lemmatize the word/token return " ".join(a) # FINAL PREPROCESSING def finalpreprocess(string): return lemmatizer(stopword(preprocess(string))) ################ Data Cleaning and model building for NLP Text Classification ############################## def model_train(dataset, input_feature, target_data,balance_data): try: lst = [] data_dict = {} df_train = dataset # pd.read_csv(filepath, encoding='ISO-8859-1') print(df_train.shape) text = input_feature # 'Review' target = target_data # 'Liked' print("Data Pre-Process Started") df_train['clean_text'] = df_train[text].apply(lambda x: finalpreprocess(x)) # df_train.head() print("Data Pre-Process Finished") # TF-IDF # Convert x_train to vector since model can only run on numbers and not words- Fit and transform tfidf_vectorizer = TfidfVectorizer(use_idf=True) X_train_vectors_tfidf = tfidf_vectorizer.fit_transform( df_train['clean_text']) # tfidf runs on non-tokenized sentences unlike word2ve train_data=X_train_vectors_tfidf target_data=df_train[target] if balance_data == "Auto": d = {} d["Before Handling Imbalanced Dataset"] = target_data.value_counts() oversample = SMOTE() train_data, target_data = oversample.fit_resample(train_data, target_data) d["After Handling Imbalanced Dataset"] = target_data.value_counts() data_dict["Handling Imbalanced Dataset"] = d elif balance_data == "False": data_dict["Cannot Handle Imbalanced Dataset,It is set to False"] = "" X_train, X_val, y_train, y_val = train_test_split(train_data, target_data, test_size=0.2, shuffle=True) pipeline_lr = Pipeline([('lr_classifier', LogisticRegression(solver='liblinear', C=10, penalty='l2'))]) pipeline_nb = Pipeline([('nb_classifier', MultinomialNB())]) pipeline_knn = Pipeline([('knn_classifier', KNeighborsClassifier())]) pipeline_dt = Pipeline([('dt_classifier', DecisionTreeClassifier())]) pipeline_xg = Pipeline([('xg_classifier', XGBClassifier())]) pipelines = [pipeline_lr, pipeline_nb, pipeline_knn, pipeline_dt, pipeline_xg] best_accuracy = 0.0 best_classifier = 0 best_pipeline = "" pipe_dict = {0: 'Logistic_Regression', 1: 'MultinomialNB', 2: 'KNeighborsClassifier', 3: 'DecisionTreeClassifier', 4: "XGBoost_Classifier"} for pipe in pipelines: pipe.fit(X_train, y_train) models_info = {} for i, model in enumerate(pipelines): val = "{} Test Accuracy: {}".format(pipe_dict[i], model.score(X_val, y_val)) lst.append(val) models_info[pipe_dict[i]] = model.score(X_val, y_val) print("{} Test Accuracy: {}".format(pipe_dict[i], model.score(X_val, y_val))) df_models_info = pd.DataFrame(models_info.items(), columns=["Models", "Accuracy"]) for i, model in enumerate(pipelines): if model.score(X_val, y_val) > best_accuracy: best_accuracy = model.score(X_val, y_val) best_pipeline = model best_classifier = i val1 = 'Classifier with best accuracy:{}'.format(pipe_dict[best_classifier]) lst.append(val1) print('Classifier with best accuracy:{}'.format(pipe_dict[best_classifier])) y_predict = best_pipeline.predict(X_val) cn = confusion_matrix(y_val, y_predict) print(cn) report = classification_report(y_val, y_predict) print(report) data_dict['Model details'] = lst fig = px.histogram(df_models_info, x="Models", y="Accuracy", color="Models") fig.update_layout(yaxis_title="Accuracy") data_dict['model_comparison'] = fig data_dict['Best model'] = lst[-1].split(':')[1] data_dict['Best pipeline'] = best_pipeline data_dict['Confusion Matrix'] = cn data_dict['Classification Report'] = report data_dict['tfidf_vector'] = tfidf_vectorizer y_scores = best_pipeline.predict_proba(X_val) # One hot encode the labels in order to plot them y_onehot = pd.get_dummies(y_val, columns=best_pipeline.classes_) # Create an empty figure, and iteratively add new lines # every time we compute a new class fig = go.Figure() fig.add_shape( type='line', line=dict(dash='dash'), x0=0, x1=1, y0=0, y1=1 ) for i in range(y_scores.shape[1]): y_true = y_onehot.iloc[:, i] y_score = y_scores[:, i] fpr, tpr, _ = roc_curve(y_true, y_score) auc_score = roc_auc_score(y_true, y_score) name = f"{y_onehot.columns[i]} (AUC={auc_score:.2f})" fig.add_trace(go.Scatter(x=fpr, y=tpr, name=name, mode='lines')) fig.update_layout( xaxis_title='False Positive Rate', yaxis_title='True Positive Rate', yaxis=dict(scaleanchor="x", scaleratio=1), xaxis=dict(constrain='domain'), width=700, height=500 ) data_dict['ROC Curve'] = fig return data_dict except: return None ########################################################################## #### TESTING THE MODEL ON text ######### def predict_text(text, model, tfidf_vectorizer): df = pd.DataFrame() empty = [] empty.append(text) df['text'] = empty df['clean_text'] = df['text'].apply(lambda x: finalpreprocess(x)) # preprocess the data X_test = df['clean_text'] X_vector = tfidf_vectorizer.transform(X_test) # converting X_test to vector y_predict = model.predict(X_vector) return y_predict ######################################### #### TESTING THE MODEL ON CSV PREDICTION #### def predict_csv(df, model, tfidf_vectorizer, input): df['clean_text'] = df[input].apply(lambda x: finalpreprocess(x)) # preprocess the data X_test = df['clean_text'] X_vector = tfidf_vectorizer.transform(X_test) # converting X_test to vector y_predict = model.predict(X_vector) return y_predict ###############################################