Real Time Text Classification using Kafka and scikit-learn

Introduction:

Text classification is one of the essential tasks in supervised machine learning (ML). Assigning categories to text, which can be tweets, Facebook posts, web page, library book, media articles, gallery, etc. has many applications like spam filtering, sentiment analysis, etc. In this blog, we build a text classification engine to classify topics in an incoming Twitter stream using Apache Kafka and scikit-learn - a Python based Machine Learning Library.

Let's dive into the details. Here is a diagram to explain visually the components and data flow. The Kafka producer will ingest data from Twitter and send it to Kafka broker. The Kafka consumer will ask the Kafka broker for the tweets. We convert the tweets binary stream from Kafka to human readable strings and perform predictions using saved models. We train the models using Twenty Newsgroups which is a prebuilt training data from Sci-kit. It is a standard data set used for training classification algorithms. 

In this blog we will use the following machine learning models:

We have used the following libraries/tools:

  • tweepy - Twitter library for python
  • Apache Kafka
  • scikit-learn
  • pickle - Python Object serialization library

Let’s first understand the following key concepts:

  • Word to Vector Methodology (Word2Vec)
  • Bag-of-Words
  • tf-idf
  • Multinomial Naive Bayes classifier

Word2Vec methodology

One of the key ideas in Natural Language Processing(NLP) is how we can efficiently convert words into numeric vectors which can then be given as an input to machine learning models to perform predictions.

Neural networks or any other machine learning models are nothing but mathematical functions which need numbers or vectors to churn out the output except tree based methods, they can work on words.

For this we have an approach known as Word2Vec. A very trivial solution to this would be to use “one-hot” method of converting the word into a sparse matrix with only one element of the vector set to 1, the rest being zero.

For example, “the apple a day the good” would have following representation

Screen Shot 2018-02-21 at 1.29.08 PM.png

Here we have transformed the above sentence into a 6×5 matrix, with the 5 being the size of the vocabulary as “the” is repeated. But what are we supposed to do when we have a gigantic dictionary to learn from say more than 100000 words? Here one hot encoding fails. In one hot encoding the relationship between the words is lost. Like “Lanka” should come after “Sri”.

Here is where Word2Vec comes in. Our goal is to vectorize the words while maintaining the context. Word2vec can utilize either of two model architectures to produce a distributed representation of words: continuous bag-of-words (CBOW) or continuous skip-gram. In the continuous bag-of-words architecture, the model predicts the current word from a window of surrounding context words. The order of context words does not influence prediction (bag-of-words assumption). In the continuous skip-gram architecture, the model uses the current word to predict the surrounding window of context words. 

Tf-idf (term frequency–inverse document frequency)

TF-IDF is a statistic which determines how important is a word to the document in given corpus. Variations of tf-idf is used by search engines, for text summarizations etc. You can read more about tf-idf - here.

Multinomial Naive Bayes classifier

Naive Bayes Classifier comes from family of probabilistic classifiers based on Bayes theorem. We use it to classify spam or not spam, sports or politics etc. We are going to use this for classifying streams of tweets coming in. You can explore it - here.

Lets how they fit in together.

cbow.png

The data from the "20 newsgroups datasets" is completely in text format. We cannot feed it directly to any model to do mathematical calculations. We have to extract features from the datasets and have to convert them to numbers which a model can ingest and then produce an output.
So, we use Continuous Bag of Words and tf-idf for extracting features from datasets and then ingest them to multinomial naive bayes classifier to get predictions.

1. Train Your Model

We are going to use this dataset. We create another file and import the needed libraries We are using sklearn for ML and pickle to save trained model. Now we define the model.

from __future__ import division,print_function, absolute_import
from sklearn.datasets import fetch_20newsgroups #built-in dataset 
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import MultinomialNB
import pickle
from kafka import KafkaConsumer

#Defining model and training it
categories = ["talk.politics.misc","misc.forsale","rec.motorcycles",\
"comp.sys.mac.hardware","sci.med","talk.religion.misc"] #http://qwone.com/~jason/20Newsgroups/ for reference

def fetch_train_dataset(categories):
    twenty_train = fetch_20newsgroups(subset='train', categories=categories, shuffle=True, random_state=42)
    return twenty_train

def bag_of_words(categories):
    count_vect = CountVectorizer()
    X_train_counts = count_vect.fit_transform(fetch_train_dataset(categories).data)
    pickle.dump(count_vect.vocabulary_, open("vocab.pickle", 'wb'))
    return X_train_counts

def tf_idf(categories):
    tf_transformer = TfidfTransformer()
    return (tf_transformer,tf_transformer.fit_transform(bag_of_words(categories)))

def model(categories):
    clf = MultinomialNB().fit(tf_idf(categories)[1], fetch_train_dataset(categories).target)
    return clf

model = model(categories)
pickle.dump(model,open("model.pickle", 'wb'))
print("Training Finished!")
#Training Finished Here

2. The Kafka Tweet Producer

We have the trained model in place. Now lets get the real time stream of Twitter via Kafka. We define the Producer.

# import required libraries
from kafka import SimpleProducer, KafkaClient
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
from twitter_config import consumer_key, consumer_secret, access_token, access_token_secret
import json

Now we will define Kafka settings and will create KafkaPusher Class. This is necessary because we need to send the data coming from tweepy stream to Kafka producer.

# Kafka settings
topic = b'twitter-stream'

# setting up Kafka producer
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)

class KafkaPusher(StreamListener):

    def on_data(self, data):
        all_data = json.loads(data)
        tweet = all_data["text"]
        producer.send_messages(topic, tweet.encode('utf-8'))
        return True

    def on_error(self, status):
        print status
WORDS_TO_TRACK = ["Politics","Apple","Google","Microsoft","Bikes","Harley Davidson","Medicine"]

if __name__ == '__main__':
    l = KafkaPusher()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    while True:
        try:
            stream.filter(languages=["en"], track=WORDS_TO_TRACK)
        except:
            pass

Note - You need to start Kafka server before running this script.

3. Loading your model for predictions

Now we have the trained model in step 1 and a twitter stream in step 2. Lets use the model now to do actual predictions. The first step is to load the model:

#Loading model and vocab
print("Loading pre-trained model")
vocabulary_to_load = pickle.load(open("vocab.pickle", 'rb'))
count_vect = CountVectorizer(vocabulary=vocabulary_to_load)
load_model = pickle.load(open("model.pickle", 'rb'))
count_vect._validate_vocabulary()
tfidf_transformer = tf_idf(categories)[0]

Then we start the kafka consumer and begin predictions:

#predicting the streaming kafka messages
consumer = KafkaConsumer('twitter-stream',bootstrap_servers=\['localhost:9092'])
print("Starting ML predictions.")
for message in consumer:
    X_new_counts = count_vect.transform([message.value])
    X_new_tfidf = tfidf_transformer.transform(X_new_counts)
    predicted = load_model.predict(X_new_tfidf)  
    print(message.value+" => "+fetch_train_dataset(categories).target_names[predicted[0]])

Following are some of the classification done by our model

  • RT @amazingatheist: Making fun of kids who survived a school shooting just days after the event because you disagree with their politics is… => talk.politics.misc
  • https://twitter.com/i/web/status/966225658983587841 => sci.med
  • RT @DavidKlion: Apropos of that D'Souza tweet; I think in order to make sense of our politics, you need to understand that there are some t… => talk.politics.misc
  • RT @BeauWillimon: These students have already cemented a place in history with their activism, and they’re just getting started. No one wil… => talk.politics.misc
  • RT @byedavo: Cause we ain’t got no president https://t.co/yPVosCKPFm => talk.politics.misc
  • RT @appleinsider: .@Apple reportedly in talks to buy cobalt, key Li-ion battery ingredient, directly from miners https://t.co/onkaB3zRO0 ht… => comp.sys.mac.hardware

Here is the link to the complete git repository: https://github.com/velotio-tech/kafka-ml

Conclusion:

In this blog, we were successful in creating a data pipeline where we were using the Naive Bayes model for doing classification of the streaming twitter data. We can classify other sources of data like news articles, blog posts etc. Do let us know if you have any questions, queries and additional thoughts in the comments section below.

Happy coding!

Vipul.png

Vipul is an R&D Engineer at Velotio. He is interested in the areas of Deep Learning and Distributed Systems. He has worked on a variety of technologies including containers, virtualization and machine learning. His hobbies include motorcycles, photography and playing the violin. Also, he is an amateur boxer.