"""CNN model for Sentiment Classification.
In this script, there is an implementation of a Convolutional Neural
Network for Sentiment Classification. The sentiments are binary. To
classify the data the model uses an Embedding Layer to convert words
to an arithmetic sequence.
Convolutions are sliding window functions applied to a matrix that
achieve specific results. The sliding window is called a kernel, filter,
or feature detector. By representing each word with a vector of numbers
of a specific length and stacking a bunch of words on top of each other,
we get an image.
See Also
--------
`<https://torchtext.readthedocs.io/en/latest/index.html>`_
References
----------
The Deep Learning Framework used for the development of the current module is Pytorch [1]_.
.. [1] PyTorch: An Imperative Style, High-Performance Deep Learning Library by Paszke, Adam and Gross,
Sam and Massa, Francisco and Lerer, Adam and Bradbury, James and Chanan, Gregory and Killeen, Trevor and Lin,
Zeming and Gimelshein, Natalia and Antiga, Luca and Desmaison, Alban and Kopf, Andreas and Yang, Edward and DeVito,
Zachary and Raison, Martin and Tejani, Alykhan and Chilamkurthy, Sasank and Steiner, Benoit and Fang, Lu and Bai,
Junjie and Chintala, Soumith, published in "Advances in Neural Information Processing Systems 32",
"Curran Associates, Inc.", "H. Wallach and H. Larochelle and A. Beygelzimer and F. Buc and E. Fox and R. Garnett",
pp. 8024-8035, 2019.
"""
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchtext import data
from nltk import word_tokenize
from collections import Counter
import re
import os
import sys
import time
import tqdm
import math
import nltk
import spacy
import numpy
import pandas
import random
import warnings
import matplotlib.pyplot as plt
[docs]class CNN(nn.Module):
"""
Convolutional Neural Network model with Pretrained Embeddings.
Attributes
----------
vocab_size : int
Size of the dictionary of embeddings.
embedding_dim: int
The size of each embedding vector.
n_filters: int
Number of channels produced by the convolution.
filter_sizes: list
A list that contains integers that correspond to the amount of channels produced by the convolution.
output_dim: int
The size of the output fully connected layer.
dropout: float
The probability of an element to be zeroed.
pad_idx: int
The numerical identifier mapped to the string token used as padding.
Methods
-------
conv_and_pool(x, conv)
Applies 1d convolution.
forward(x)
Defines the computation performed by the CNN model at every call.
"""
def __init__(self, vocab_size, embedding_dim, n_filters, filter_sizes, output_dim, dropout, pad_idx):
"""
Parameters
----------
vocab_size : int
Size of the dictionary of embeddings.
embedding_dim: int
The size of each embedding vector.
n_filters: int
Number of channels produced by the convolution.
filter_sizes: list
A list that contains integers that correspond to the amount of channels produced by the convolution.
output_dim: int
The size of the output fully connected layer.
dropout: float
The probability of an element to be zeroed.
pad_idx: int
The numerical identifier mapped to the string token used as padding.
"""
# extends the functionality of this method
super(CNN, self).__init__()
# defines an embedding layer
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
# freezes the embedding layer
self.embedding.requires_grad = False
# applies convolution over the input signal
self.convs_1d = nn.ModuleList([nn.Conv2d(1, n_filters, (k, embedding_dim), padding=(k - 2, 0))
for k in filter_sizes])
# applies linear transformation to the convolved data
self.fc = nn.Linear(len(filter_sizes) * n_filters, output_dim)
# regularizes and prevents the co-adaptation of neurons
self.dropout = nn.Dropout(dropout)
[docs] @staticmethod
def conv_and_pool(x, conv):
"""Applies 1d convolution.
The method applies a 2D convolution over the input. It then filters the convolved output
using Rectified Linear Unit. The result is a tensor of size [32 x 64 x Y x 100] where:
* 32 is the batch size
* 64 is the number of filters
* Y is the sequence length which is equal to the sentence length
* 100 is size of the second dimension of the kernel of a convolutional layer
This temporary result is then squeezed to yields a tensor of size [32 x 64 x Y].
In the last step, the method applies a 1D max pooling over the squeezed tensor with
a sliding window of size equal to Y. The result is then squeezed again to produce
a tensor of size [32 x 64].
Parameters
----------
x: torch.tensor
This is a tensor of type float that operates as input for each convolution layer.
conv: torch.nn.Module
Applies a 2D convolution over a given input.
Returns
-------
torch.tensor
The 2D tensor to be used in the linear layer.
"""
x = F.relu(conv(x)).squeeze(3)
x_max = F.max_pool1d(x, x.size(2)).squeeze(2)
return x_max
[docs] def forward(self, x):
"""Defines the computation performed by the CNN model at every call.
This method *forwards* the given input to every single model layer.
Parameters
----------
x: torch.tensor
This is a tensor of type int that operates as input for the defined model.
Returns
-------
torch.tensor
The tensor containing the predictions made by the model.
"""
# embedded vectors of: (batch_size, seq_length, embedding_dim)
embeds = self.embedding(x)
# creates a fourth dimension for the convolutional module list
embeds = embeds.unsqueeze(1)
# gets output of each convolutional layer
conv_results = [self.conv_and_pool(embeds, conv) for conv in self.convs_1d]
# concatenates results
x = torch.cat(conv_results, 1)
# add dropout
x = self.dropout(x)
# fully connected layer that yields a float tensor of size equal to the batch size
logit = self.fc(x)
return logit
[docs]def nlp_preprocessor(text):
"""Defines an NLP preprocessor.
This method takes some text and filters it. It deletes any
non - alphanumeric character found. This is a standard
preprocessing routine in machine learning models for NLP.
It increases model's performance.
Parameters
----------
text: str
This is the string to preprocess.
Returns
-------
str
The preprocessed - filtered string.
"""
text = re.sub('<[^>]*>', '', text)
emoticons = re.findall('(?::|;|=)(?:-)?[)(DP]', text)
text = re.sub('[\W]+', ' ', text.lower()) + ' '.join(emoticons).replace('-', '')
return text
[docs]def dataset_preprocessor(df, column, filepath):
"""Preprocess text in given dataset.
This method calls the predefined NLP preprocessor to filter out
any non-alphanumeric character found in the given dataset. The
method saves afterward the result using the given filepath. The
filepath can also be relative. An example filepath is provided:
`./filtered_dataset.csv`
Parameters
----------
df: pandas.DataFrame
This is the given dataset.
column: str
This is the column with the user reviews
filepath: str
This is the filepath to save the preprocessed dataset
"""
# apply the preprocessor to the dataframe
df[column] = df[column].apply(nlp_preprocessor)
# save data
df.to_csv(filepath, index=False)
[docs]def train_validate_test_split(df, seed, train_percent=.7, validate_percent=.1):
"""Splits the given dataset.
This method splits a dataframe into:
* A dataframe used to train the model
* A dataframe used to validate the model
* A dataframe used to test the model
The indexes of the given datasets are shuffled.
Parameters
----------
df: pandas.DataFrame
This is the given dataset.
seed: int
This is the seed used for the NumPy shuffler.
train_percent: float (optional)
This is the dataset split ratio to get the sample data to fit the model.
validate_percent: float (optional)
This is the dataset split ratio to get the sample data to validate the model.
"""
# shuffle the given dataframe indexes
shuffled = numpy.random.RandomState(seed).permutation(df.index)
# get the number of rows inside the dataframe
data_length = len(df.index)
# compute the number of rows for the training dataset
train_end = int(train_percent * data_length)
# make the training dataset size divide perfectly the batch size
train_end = int(train_end/BATCH_SIZE) * BATCH_SIZE + BATCH_SIZE
# compute the number of rows for the validation dataset
validate_end = int(validate_percent * data_length) + train_end
# make the validation dataset size divide perfectly the batch size
validate_end = int(validate_end / BATCH_SIZE) * BATCH_SIZE + BATCH_SIZE
# make the test dataset size divide perfectly the batch size
test_end = int(data_length / BATCH_SIZE) * BATCH_SIZE
# set the training dataset
train_df = df.iloc[shuffled[:train_end]]
# set the validation dataset
valid_df = df.iloc[shuffled[train_end:validate_end]]
# set the test dataset
test_df = df.iloc[shuffled[validate_end:test_end]]
# save the training dataset
train_df.to_csv('train_df.csv', index=False)
# save the validation dataset
valid_df.to_csv('valid_df.csv', index=False)
# save the test dataset
test_df.to_csv('test_df.csv', index=False)
def inspect_vocab(df):
"""Estimates the Vocabulary size after subsampling.
This method performs a virtual subsampling of the given dataset.
This is done to increase the context window size of the embedding
layer. If the computed probability is less than 50%, then the word
is virtually discarded. The probability is given by the formula:
.. math::
p = 1 - \\sqrt{\\frac{t}{f}},
Where:
* :math:`p` is the probability of the token to be virtually discarded
* :math:`t` is a chosen threshold typically around :math:`10^5`
* :math:`f` the token frequency
Parameters
----------
df: pandas.DataFrame
This is the given dataset.
Returns
-------
int
The vocabulary size after the virtual subsampling.
int
The vocabulary size without virtual subsampling.
See Also
--------
`<https://www.mitpressjournals.org/doi/pdf/10.1162/tacl_a_00134>`_
`<https://arxiv.org/abs/1301.3781>`_
"""
# initialize vocabulary size register
unique_count = 0
# get the text column loaded in a pandas Series
texts = df.Summary.str.lower()
# get a dictionary with the count of each token in that pandas Series
word_counts = Counter(word_tokenize('\n'.join(texts)))
# get the total token sum
total_token_count = sum(word_counts.values())
# get the unique token sum
final_count = len(word_counts)
# initialize threshold constant
threshold = 1e-5
# use the subsampling formula to estimate the vocabulary size
for token_freq in word_counts.values():
if 1 - math.sqrt(threshold / (token_freq / total_token_count)) > 0.5:
unique_count += 1
return unique_count, final_count
[docs]def compute_vocab_size():
"""Returns vocabulary size
This method computes vocabulary size. It uses the subsampling
flag defined in the main thread. If subsampling is activated,
then the method sets the vocabulary size equal to the subsampled
estimation of the vocabulary size. Otherwise, it sets the
vocabulary size equal to the total unique token count.
"""
if subsampling:
return vocab_subsampled
else:
return token_count
[docs]def count_parameters():
"""Counts model trainable parameters.
Returns
-------
int
The number of trainable parameters.
"""
return sum(p.numel() for p in model.parameters() if p.requires_grad)
[docs]def binary_accuracy(preds, y):
"""Computes prediction accuracy.
This method is used to estimate the models accuracy over binary
targets. The prediction is rounded to compare it to the true label.
Parameters
----------
preds: torch.tensor
These are the predictions returned by the model for an input batch.
y: torch.tensor
This is the ground truth tensor for the same input batch.
Returns
-------
torch.tensor
The model accuracy ratio for the given predictions. The tensor is a single float element container.
"""
# use the sigmoid to round the predictions
rounded_preds = torch.round(torch.sigmoid(preds))
# count the correct predictions by comparing them to the ground truth tensor
correct = (rounded_preds == y).float()
# compute the accuracy of the model
acc = correct.sum() / len(correct)
# return the accuracy
return acc
[docs]def get_max_length(df):
"""Computes maximum number of tokens given a dataframe.
This method is used to compute the maximum length found at the text
column of the given dataframe. The column of the dataframe with the
text is *Summary*. This function is useful if one decides to use
padding for tokenization.
Parameters
----------
df: pandas.DataFrame
This is the dataset of the model.
Returns
-------
int
The maximum number of tokens found in a dataframe column.
"""
# initializes maximum sentence length register
max_len = 0
# iterates the Summary column of the given dataframe
for text in df.Summary:
# checks length of the "running" sentence
if len(text.split()) > max_len:
# update maximum sentence length register
max_len = len(text.split())
return max_len
[docs]def train(iterator):
"""Fits a model.
This method is used to fit the defined CNN model. The method provides
progress context for the user using progressbar.
Parameters
----------
iterator: torchtext.data.Iterator
An iterator to load batches of training data from the given dataset.
Returns
-------
float
The epoch training loss
float
The epoch training accuracy
"""
# initializes epoch loss accumulator
epoch_loss = 0
# initializes epoch accuracy accumulator
epoch_acc = 0
# sets the module in training mode
model.train()
for batch in tqdm.tqdm(iterator):
# set the gradients to zero
optimizer.zero_grad()
# make predictions
predictions = model(batch.Summary).squeeze(1)
# compute loss
loss = criterion(predictions, batch.Sentiment.squeeze(0))
# compute accuracy
acc = binary_accuracy(predictions, batch.Sentiment.squeeze(0))
# store the gradients
loss.backward()
# parameter update based on the current gradients
optimizer.step()
# update epoch loss accumulator
epoch_loss += loss.item()
# update epoch accuracy accumulator
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
[docs]def evaluate(iterator):
"""Evaluates a model.
This method is called either to validate the defined CNN model
or to test it, by disabling gradient calculation. The method
provides progress context for the user using progressbar.
Parameters
----------
iterator: torchtext.data.Iterator
An iterator to load batches of evaluation data from the given dataset.
Returns
-------
float
The epoch evaluation loss
float
The epoch evaluation accuracy
"""
# initializes epoch loss accumulator
epoch_loss = 0
# initializes epoch accuracy accumulator
epoch_acc = 0
# sets the module in evaluation mode
model.eval()
# disables gradient calculation
with torch.no_grad():
for batch in tqdm.tqdm(iterator):
# make predictions
predictions = model(batch.Summary).squeeze(1)
# compute loss
loss = criterion(predictions, batch.Sentiment.squeeze(0))
# compute accuracy
acc = binary_accuracy(predictions, batch.Sentiment.squeeze(0))
# update epoch loss accumulator
epoch_loss += loss.item()
# update epoch accuracy accumulator
epoch_acc += acc.item()
return epoch_loss / len(iterator), epoch_acc / len(iterator)
[docs]def epoch_time():
"""Computes epoch duration.
This method is called upon the launch of each epoch,
and upon the termination of each epoch. It then uses
the checkpoints created to compute the epoch's duration.
Returns
-------
int
Number of minutes rounded down that represent the running epoch's duration
int
The remaining of seconds that represent the running epoch's duration
"""
elapsed_time = end_time - start_time
elapsed_mins = int(elapsed_time / 60)
elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
return elapsed_mins, elapsed_secs
[docs]def plot_loss_and_accuracy():
"""Plots model's fitting results.
This method takes the lists containing the training and the
validation losses and plots them together. This method is
useful when detecting an over-fitted model (or an under-fitted).
The method saves the plot at the project directory.
"""
plt.plot(train_losses, label="Training loss")
plt.plot(val_losses, label="Validation loss")
plt.legend()
plt.title("Losses")
plt.savefig("model-train_valid_losses.png", dpi=300, bbox_inches='tight', pad_inches=0.1)
[docs]def predict_sentiment(sentence, min_len=5):
"""Classifies a custom critic.
This method converts a sentence into arithmetic tokens.
The tokens are then given to a trained model. The model
predicts the sentiment of that *critic*.
Parameters
----------
sentence: str
The custom critic to be classified.
min_len: int (optional)
The minimum length of tokens of the given sentence.
Returns
-------
float
The probability of the critic being negative.
"""
# load natural language processor
nlp = spacy.load('en_core_web_sm')
# set the module in evaluation mode
model.eval()
# tokenize given text using the defined processor
tokenized = [tok.text for tok in nlp.tokenizer(sentence)]
# pad the sentence if it has less tokens than required
if len(tokenized) < min_len:
tokenized += ['<pad>'] * (min_len - len(tokenized))
# convert tokens to embeddings using the fit torchtext data field
indexed = [TEXT.vocab.stoi[t] for t in tokenized]
# convert embedding list to torch tensor and load it to the available device
tensor = torch.LongTensor(indexed).to(device)
# unsqueeze tensor to make it 2D
tensor = tensor.unsqueeze(0)
# filter prediction using sigmoid
prediction = F.sigmoid(model(tensor))
return prediction.item()
[docs]def filter_prediction(prediction, critic):
"""Provides feedback over a custom prediction.
This method takes the prediction of the model on a custom critic and
defines if it was positive or negative. Finally, it prints the proper
message.
Parameters
----------
prediction: float
The probability of a critic being negative
critic: str
The word sequence used as a critic
"""
message = "negative"
if prediction < 0.5:
message = "positive"
prediction = 1 - prediction
print('Label for critic {:25s}: {:7s}\t-\tPrediction validity probability: {:10f}'.format(
'\"'+critic+'\"', message, prediction))
[docs]def manual_testing():
"""Calls model upon custom critics.
In this method there are some movie critics
defined to test the model with custom data.
"""
x_critic = "This film is terrible"
y_pred = predict_sentiment(x_critic)
filter_prediction(y_pred, x_critic)
x_critic = "This film is great"
y_pred = predict_sentiment(x_critic)
filter_prediction(y_pred, x_critic)
x_critic = "I loved this film"
y_pred = predict_sentiment(x_critic)
filter_prediction(y_pred, x_critic)
if __name__ == "__main__":
# install nltk punkt
nltk.download('punkt')
# define a seed for the randomizers
SEED = 42
# install English Spacy package
spacy.cli.download("en")
# load English package of spacy package
spacy.load('en_core_web_sm')
# disable warnings
warnings.filterwarnings("ignore")
# seed random package
random.seed(SEED)
# seed numpy
numpy.random.seed(SEED)
# seed pytorch
torch.manual_seed(SEED)
# check for any CUDA device available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# make program controllability easier
torch.backends.cudnn.deterministic = True
# define batch size
BATCH_SIZE = 32
# define filepath to dave model
model_filepath = os.getcwd() + os.sep + 'cnn-model.pt'
# define the input's filepath
dataset_filepath = '..' + os.sep + 'dataset' + os.sep + 'MoviesDataset.csv'
# load the dataset
dataset = pandas.read_csv(dataset_filepath)
# dataset preprocessed
dataset_preprocessor(dataset, 'Summary',
'..' + os.sep + 'dataset' + os.sep + 'MoviesDatasetPreprocessed.csv')
# reload dataset after preprocessing
dataset = pandas.read_csv('..' + os.sep + 'dataset' + os.sep + 'MoviesDatasetPreprocessed.csv')
# inspect vocabulary
vocab_subsampled, token_count = inspect_vocab(dataset)
# set subsampling flag
subsampling = True
# set vocabulary size
vocab_size = compute_vocab_size()
# split the dataset
train_validate_test_split(dataset, SEED)
# define torchtext data text field
TEXT = data.Field(tokenize='spacy', batch_first=True)
# define torchtext data label field
LABEL = data.Field(dtype=torch.float, unk_token=None, pad_token=None)
# associate defined fields with DataFrame columns
fields = [('Summary', TEXT), ('Sentiment', LABEL)]
# define a dataset of columns stored in CSV
train_data, valid_data, test_data = data.TabularDataset.splits(
path='./',
train='train_df.csv',
validation='valid_df.csv',
test='test_df.csv',
format='csv',
fields=fields,
skip_header=True
)
# construct the Vocab object for the TEXT field
TEXT.build_vocab(train_data, valid_data, test_data,
max_size=vocab_size,
vectors="glove.6B.100d",
unk_init=torch.Tensor.normal_)
# construct the Vocab object for the LABEL field
LABEL.build_vocab(train_data)
# define an iterator that batches the training dataset object
train_iterator = data.BucketIterator(
train_data,
batch_size=BATCH_SIZE,
device=device,
)
# define an iterator that batches the validation dataset object
valid_iterator = data.BucketIterator(
valid_data,
batch_size=BATCH_SIZE,
device=device,
)
# define an iterator that batches the test dataset object
test_iterator = data.BucketIterator(
test_data,
batch_size=BATCH_SIZE,
device=device,
)
# define the size of the dictionary of embeddings
INPUT_DIM = len(TEXT.vocab)
# define the size of each embedding vector
EMBEDDING_DIM = 100
# define the number of channels produced by each convolution
N_FILTERS = 64
# define the size of the first dimension of the kernel of each convolutional layer
FILTER_SIZES = [2, 3, 4, 5]
# define the number of neurons in the output layer of the model
OUTPUT_DIM = 1
# define the probability of an element to be zeroed
DROPOUT = 0.3
# return the index of the string token used as padding
PAD_IDX = TEXT.vocab.stoi[TEXT.pad_token]
# return the index of the string token used to represent Out-Of-Vocabulary words
UNK_IDX = TEXT.vocab.stoi[TEXT.unk_token]
# define a CNN model
model = CNN(INPUT_DIM, EMBEDDING_DIM, N_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT, PAD_IDX)
# print model summary
print(model)
# print model trainable parameters
print(f'The model has {count_parameters():,} trainable parameters')
# extract pretrained vectors
pretrained_embeddings = TEXT.vocab.vectors
# copy pretrained vectors to the embedding layer of the defined model
model.embedding.weight.data.copy_(pretrained_embeddings)
# set the weight of the <pad> token
model.embedding.weight.data[PAD_IDX] = torch.zeros(EMBEDDING_DIM)
# set the weight of the <unk> token
model.embedding.weight.data[UNK_IDX] = torch.zeros(EMBEDDING_DIM)
# define optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)
# define cost function
criterion = nn.BCEWithLogitsLoss(pos_weight=torch.ones([BATCH_SIZE]))
# load model to the available device
model = model.to(device)
# load cost function to the available device
criterion = criterion.to(device)
# define number of epochs for the model's training
N_EPOCHS = 20
# initialize a register that holds the best validation cost returned during an epoch
best_valid_loss = float('inf')
# declare the train and validation loss lists
train_losses, val_losses = [], []
# fit the model
for epoch in range(N_EPOCHS):
# initialize an epoch starting time-point
start_time = time.time()
# train the model
train_loss, train_acc = train(train_iterator)
# update the train loss list
train_losses.append(train_acc)
# validate the model
valid_loss, valid_acc = evaluate(valid_iterator)
# update the validation loss list
val_losses.append(valid_acc)
# initialize an epoch ending time-point
end_time = time.time()
# compute epoch duration in minutes and seconds
epoch_mins, epoch_secs = epoch_time()
# save the model if validation loss was better than past validation losses
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), model_filepath)
# print epoch's progress results
print(f'\nEpoch: {epoch + 1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc * 100:.2f}%')
print(f'\t Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc * 100:.2f}%\n')
# plot model's fitting data
plot_loss_and_accuracy()
# load the best evaluated model
model.load_state_dict(torch.load(model_filepath, map_location=device))
# test the model
test_loss, test_acc = evaluate(test_iterator)
# print test results
print(f'\nTest Loss: {test_loss:.3f} | Test Acc: {test_acc * 100:.2f}%')
# test the model over custom critics
manual_testing()