Thanks! We'll be in touch in the next 12 hours
Oops! Something went wrong while submitting the form.

Real Time Analytics for IoT Data using Mosquitto, AWS Kinesis and InfluxDB

Internet of things (IoT) is maturing rapidly and it is finding application across various industries. Every common device that we use is turning into the category of smart devices. Smart devices are basically IoT devices. These devices captures various parameters in and around their environment leading to generation of a huge amount of data. This data needs to be collected, processed, stored and analyzed in order to get actionable insights from them. To do so, we need to build data pipeline.  In this blog we will be building a similar pipeline using Mosquitto, Kinesis, InfluxDB and Grafana. We will discuss all these individual components of the pipeline and the steps to build it.

Why the Analysis of IoT data is different

In an IoT setup, the data is generated by sensors that are distributed across various locations. In order to use the data generated by them we should first get them to a common location from where the various applications which want to process them can read it.

Network Protocol

IoT devices have low computational and network resources. Moreover, these devices write data in very short intervals thus high throughput is expected on the network. For transferring IoT data it is desirable to use lightweight network protocols. A protocol like HTTP uses a complex structure for communication resulting in consumption of more resources making it unsuitable for IoT data transfer. One of the lightweight protocol suitable for IoT data is MQTT which we are using in our pipeline. MQTT is designed for machine to machine (M2M) connectivity. It uses a publisher/subscriber communication model and helps clients to distribute telemetry data with very low network resource consumption. Along with IoT MQTT has been found to be useful in other fields as well.

MQTT Publish/Subscribe Model

Other similar protocols include Constrained Application Protocol (CoAP), Advanced Message Queuing Protocol (AMQP) etc.

Datastore   

IoT devices generally collect telemetry about its environment usually through sensors. In most of the IoT scenarios, we try to analyze how things have changed over a period of time. Storing these data in a time series database makes our analysis simpler and better. InfluxDB is popular time series database which we will use in our pipeline. More about time series databases can be read here.

Pipeline Overview

MQTT Pipeline Overview

The first thing we need for a data pipeline is data. As shown in the image above the data generated by various sensors are written to a topic in the MQTT message broker. To mimic sensors we will use a program which uses the MQTT client to write data to the MQTT broker.

The next component is Amazon Kinesis which is used for streaming data analysis. It closely resembles apache Kafka which is an open source tool used for similar purposes. Kinesis brings the data generated by a number of clients to a single location from where different consumers can pull it for processing. We are using Kinesis so that multiple consumers can read data from a single location. This approach scales well even if we have multiple message brokers.

Once the data is written to the MQTT broker a Kinesis producer subscribes to it and pull the data from it and writes it to the Kinesis stream, from the Kinesis stream the data is pulled by Kinesis consumers which processes the data and writes it to an InfluxDB which is a time series database.

Finally, we use Grafana which is a well-known tool for analytics and monitoring, we can connect it to many popular databases and perform analytics and monitoring. Another popular tool in this space is Kibana (the K of ELK stack)

Setting up a MQTT Message Broker Server:

For MQTT message broker we will use Mosquitto which is a popular open source message broker that implements MQTT. The details of downloading and installing mosquitto for various platforms are available here.

For Ubuntu, it can be installed using the following commands

CODE: https://gist.github.com/velotiotech/513f9cde580cfd7dbfbf79adbfdc76f8.js

Setting up InfluxDB and Grafana

The simplest way to set up both these components is to use their docker image directly

CODE: https://gist.github.com/velotiotech/1b05736eae577b3def2b28ae27e61ed4.js

In InfluxDB we have mapped two ports, port 8086 is the HTTP API endpoint port while 8083 is the administration web server’s port. We need to create a database where we will write our data.

For creating a database we can directly go to the console at <influxdb-ip>:8083 and run the command: </influxdb-ip>

CODE: https://gist.github.com/velotiotech/6df4c2cea1d36199ab93173497984ca5.js

Or we can do it via HTTP request :

CODE: https://gist.github.com/velotiotech/1b4d1d85bb8db67da93d9d777c2a136c.js

Creating a Kinesis stream

In Kinesis, we create streams where the Kinesis producers write the data coming from various sources and then the Kinesis consumers read the data from the stream. In the stream, the data is stored in various shards. For our purpose, one shard would be enough.

Creating Kinesis Stream

Creating the MQTT client

We will use the Golang client available in this repository to connect with our message broker server and write data to a specific topic. We will first create a new MQTT client. Here we can see the list of options we have for configuring our MQTT client.

Once we create the options object we can pass it to the NewClient() method which will return us the MQTT client. Now we can write data to the MQTT server. We have defined the structure of the data in the struct sensor data. Now to mimic two sensors which are writing telemetry data to the MQTT broker we have two goroutines which push data to the MQTT server every five seconds.

CODE: https://gist.github.com/velotiotech/5bb7428bf0e8f298b1ecc9a30d0fc774.js

Create a Kinesis Producer

Now we will create a Kinesis producer which subscribes to the topic to which our MQTT client writes data and pull the data from the broker and pushes it to the Kinesis stream. Just like in the previous section here also we first create an MQTT client which connects to the message broker and subscribe to the topic to which our clients/publishers are going to write data to. In the client option, we have the option to define a function which will be called when data is written to this topic. We have created a function postDataTokinesisStream() which connects Kinesis using the Kinesis client and then writes data to the Kinesis stream, every time a data is pushed to the topic.

CODE: https://gist.github.com/velotiotech/ea4f249fe7d48becca0a60ad3bffbc9b.js

Create a Kinesis Consumer

Now the data is available in our Kinesis stream we can pull it for processing. In the Kinesis consumer section, we create a Kinesis client just like we did in the previous section and then pull data from it. Here we first make a call to the DescribeStream method which returns us the shardId, we then use this shardId to get the ShardIterator and then finally we are able to fetch the records by passing the ShardIterator to GetRecords() method. GetRecords() also returns the  NextShardIterator which we can use to continuously look for records in the shard until NextShardIterator becomes null.

CODE: https://gist.github.com/velotiotech/53fc75de2acf37bc749afca87c350448.js

Processing the data and writing it to InfluxDB

Now we do simple processing of filtering out data. The data that we got from the sensor is having fields sensorId, temperature, humidity, city, and timestamp but we are interested in only the values of temperature and humidity for a city so we have created a new structure ‘SensorDataFiltered’ which contains only the fields we need.

For every record that the Kinesis consumer receives it creates an instance of the SensorDataFiltered type and calls the PostDataToInfluxDB() method where the record received from the Kinesis stream is unmarshaled into the SensorDataFiltered type and send to InfluxDB. Here we need to provide the name of the database we created earlier to the variable dbName and the InfluxDB host and port values to dbHost and dbPort respectively.

In the InfluxDB request body, the first value that we provide is used as the measurement which is an InfluxDB struct to store similar data together. Then we have tags, we have used `city` as our tag so that we can filter the data based on them and then we have the actual values. For more details on InfluxDB data write format please refer here.

CODE: https://gist.github.com/velotiotech/9bb20ce2e4ca1a47099f02086dc07984.js

Once the data is written to InfluxDB we can see it in the web console by querying the measurement create in our database.

InfluxDB - Write Data

Putting everything together in our main function

Now we need to simply call the functions we discussed above and run our main program. Note that we have used `go` before the first two function call which makes them goroutines and they execute concurrently.

On running the code you will see the logs for all the stages of our pipeline getting written to the stdout and it very closely resembles real-life scenarios where data is written by IoT devices and gets processed in near real-time.

CODE: https://gist.github.com/velotiotech/f87ada32a8cc3213f09f0c062f9ff607.js

Visualization through Grafana

We can access the Grafana web console at port 3000 of the machine on which it is running. First, we need to add our InfluxDB as a data source to it under the data sources option.

Grafana Visualization

For creating dashboard go to the dashboard option and choose new. Once the dashboard is created we can start by adding a panel.

InfluxDB Dashboard


We need to add Influxdb data source that we added earlier as the panel data source and write queries as shown in the image below.

InfluxDB Data Source

We can repeat the same process for adding another panel to the dashboard this time choosing a different city in our query.

InfluxDB Panel

Conclusion:

IoT data analytics is a fast evolving and interesting space. The number of IoT devices are growing rapidly. There is a great opportunity to get valuable insights from the huge amount of data generated by these device. In this blog, I tried to help you grab that opportunity by building a near real time data pipeline for IoT data. If you like it please share and subscribe to our blog.

Get the latest engineering blogs delivered straight to your inbox.
No spam. Only expert insights.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.
Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings

Real Time Analytics for IoT Data using Mosquitto, AWS Kinesis and InfluxDB

Internet of things (IoT) is maturing rapidly and it is finding application across various industries. Every common device that we use is turning into the category of smart devices. Smart devices are basically IoT devices. These devices captures various parameters in and around their environment leading to generation of a huge amount of data. This data needs to be collected, processed, stored and analyzed in order to get actionable insights from them. To do so, we need to build data pipeline.  In this blog we will be building a similar pipeline using Mosquitto, Kinesis, InfluxDB and Grafana. We will discuss all these individual components of the pipeline and the steps to build it.

Why the Analysis of IoT data is different

In an IoT setup, the data is generated by sensors that are distributed across various locations. In order to use the data generated by them we should first get them to a common location from where the various applications which want to process them can read it.

Network Protocol

IoT devices have low computational and network resources. Moreover, these devices write data in very short intervals thus high throughput is expected on the network. For transferring IoT data it is desirable to use lightweight network protocols. A protocol like HTTP uses a complex structure for communication resulting in consumption of more resources making it unsuitable for IoT data transfer. One of the lightweight protocol suitable for IoT data is MQTT which we are using in our pipeline. MQTT is designed for machine to machine (M2M) connectivity. It uses a publisher/subscriber communication model and helps clients to distribute telemetry data with very low network resource consumption. Along with IoT MQTT has been found to be useful in other fields as well.

MQTT Publish/Subscribe Model

Other similar protocols include Constrained Application Protocol (CoAP), Advanced Message Queuing Protocol (AMQP) etc.

Datastore   

IoT devices generally collect telemetry about its environment usually through sensors. In most of the IoT scenarios, we try to analyze how things have changed over a period of time. Storing these data in a time series database makes our analysis simpler and better. InfluxDB is popular time series database which we will use in our pipeline. More about time series databases can be read here.

Pipeline Overview

MQTT Pipeline Overview

The first thing we need for a data pipeline is data. As shown in the image above the data generated by various sensors are written to a topic in the MQTT message broker. To mimic sensors we will use a program which uses the MQTT client to write data to the MQTT broker.

The next component is Amazon Kinesis which is used for streaming data analysis. It closely resembles apache Kafka which is an open source tool used for similar purposes. Kinesis brings the data generated by a number of clients to a single location from where different consumers can pull it for processing. We are using Kinesis so that multiple consumers can read data from a single location. This approach scales well even if we have multiple message brokers.

Once the data is written to the MQTT broker a Kinesis producer subscribes to it and pull the data from it and writes it to the Kinesis stream, from the Kinesis stream the data is pulled by Kinesis consumers which processes the data and writes it to an InfluxDB which is a time series database.

Finally, we use Grafana which is a well-known tool for analytics and monitoring, we can connect it to many popular databases and perform analytics and monitoring. Another popular tool in this space is Kibana (the K of ELK stack)

Setting up a MQTT Message Broker Server:

For MQTT message broker we will use Mosquitto which is a popular open source message broker that implements MQTT. The details of downloading and installing mosquitto for various platforms are available here.

For Ubuntu, it can be installed using the following commands

CODE: https://gist.github.com/velotiotech/513f9cde580cfd7dbfbf79adbfdc76f8.js

Setting up InfluxDB and Grafana

The simplest way to set up both these components is to use their docker image directly

CODE: https://gist.github.com/velotiotech/1b05736eae577b3def2b28ae27e61ed4.js

In InfluxDB we have mapped two ports, port 8086 is the HTTP API endpoint port while 8083 is the administration web server’s port. We need to create a database where we will write our data.

For creating a database we can directly go to the console at <influxdb-ip>:8083 and run the command: </influxdb-ip>

CODE: https://gist.github.com/velotiotech/6df4c2cea1d36199ab93173497984ca5.js

Or we can do it via HTTP request :

CODE: https://gist.github.com/velotiotech/1b4d1d85bb8db67da93d9d777c2a136c.js

Creating a Kinesis stream

In Kinesis, we create streams where the Kinesis producers write the data coming from various sources and then the Kinesis consumers read the data from the stream. In the stream, the data is stored in various shards. For our purpose, one shard would be enough.

Creating Kinesis Stream

Creating the MQTT client

We will use the Golang client available in this repository to connect with our message broker server and write data to a specific topic. We will first create a new MQTT client. Here we can see the list of options we have for configuring our MQTT client.

Once we create the options object we can pass it to the NewClient() method which will return us the MQTT client. Now we can write data to the MQTT server. We have defined the structure of the data in the struct sensor data. Now to mimic two sensors which are writing telemetry data to the MQTT broker we have two goroutines which push data to the MQTT server every five seconds.

CODE: https://gist.github.com/velotiotech/5bb7428bf0e8f298b1ecc9a30d0fc774.js

Create a Kinesis Producer

Now we will create a Kinesis producer which subscribes to the topic to which our MQTT client writes data and pull the data from the broker and pushes it to the Kinesis stream. Just like in the previous section here also we first create an MQTT client which connects to the message broker and subscribe to the topic to which our clients/publishers are going to write data to. In the client option, we have the option to define a function which will be called when data is written to this topic. We have created a function postDataTokinesisStream() which connects Kinesis using the Kinesis client and then writes data to the Kinesis stream, every time a data is pushed to the topic.

CODE: https://gist.github.com/velotiotech/ea4f249fe7d48becca0a60ad3bffbc9b.js

Create a Kinesis Consumer

Now the data is available in our Kinesis stream we can pull it for processing. In the Kinesis consumer section, we create a Kinesis client just like we did in the previous section and then pull data from it. Here we first make a call to the DescribeStream method which returns us the shardId, we then use this shardId to get the ShardIterator and then finally we are able to fetch the records by passing the ShardIterator to GetRecords() method. GetRecords() also returns the  NextShardIterator which we can use to continuously look for records in the shard until NextShardIterator becomes null.

CODE: https://gist.github.com/velotiotech/53fc75de2acf37bc749afca87c350448.js

Processing the data and writing it to InfluxDB

Now we do simple processing of filtering out data. The data that we got from the sensor is having fields sensorId, temperature, humidity, city, and timestamp but we are interested in only the values of temperature and humidity for a city so we have created a new structure ‘SensorDataFiltered’ which contains only the fields we need.

For every record that the Kinesis consumer receives it creates an instance of the SensorDataFiltered type and calls the PostDataToInfluxDB() method where the record received from the Kinesis stream is unmarshaled into the SensorDataFiltered type and send to InfluxDB. Here we need to provide the name of the database we created earlier to the variable dbName and the InfluxDB host and port values to dbHost and dbPort respectively.

In the InfluxDB request body, the first value that we provide is used as the measurement which is an InfluxDB struct to store similar data together. Then we have tags, we have used `city` as our tag so that we can filter the data based on them and then we have the actual values. For more details on InfluxDB data write format please refer here.

CODE: https://gist.github.com/velotiotech/9bb20ce2e4ca1a47099f02086dc07984.js

Once the data is written to InfluxDB we can see it in the web console by querying the measurement create in our database.

InfluxDB - Write Data

Putting everything together in our main function

Now we need to simply call the functions we discussed above and run our main program. Note that we have used `go` before the first two function call which makes them goroutines and they execute concurrently.

On running the code you will see the logs for all the stages of our pipeline getting written to the stdout and it very closely resembles real-life scenarios where data is written by IoT devices and gets processed in near real-time.

CODE: https://gist.github.com/velotiotech/f87ada32a8cc3213f09f0c062f9ff607.js

Visualization through Grafana

We can access the Grafana web console at port 3000 of the machine on which it is running. First, we need to add our InfluxDB as a data source to it under the data sources option.

Grafana Visualization

For creating dashboard go to the dashboard option and choose new. Once the dashboard is created we can start by adding a panel.

InfluxDB Dashboard


We need to add Influxdb data source that we added earlier as the panel data source and write queries as shown in the image below.

InfluxDB Data Source

We can repeat the same process for adding another panel to the dashboard this time choosing a different city in our query.

InfluxDB Panel

Conclusion:

IoT data analytics is a fast evolving and interesting space. The number of IoT devices are growing rapidly. There is a great opportunity to get valuable insights from the huge amount of data generated by these device. In this blog, I tried to help you grab that opportunity by building a near real time data pipeline for IoT data. If you like it please share and subscribe to our blog.

Did you like the blog? If yes, we're sure you'll also like to work with the people who write them - our best-in-class engineering team.

We're looking for talented developers who are passionate about new emerging technologies. If that's you, get in touch with us.

Explore current openings