Share this article:
✉ Send Feedback for this Article.

Vertica and MQTT: Technical Exploration

About this Document

MQTT is used for data exchange between IoT devices and server applications. The goal of this document is to integrate Vertica and MQTT to analyze data in Vertica from sensor devices. In this case, we used the Mosquitto MQTT broker to relay data from the sensor and copy this data into Vertica for further analysis. We ran Python programs to mimic sensors that send data to the MQTT broker. This data is read from the MQTT broker and copied into Vertica.

MQTT Overview

Message Queuing Telemetry Transport (MQTT) is a light weight publish subscribe network protocol that transports messages between IoT devices. MQTT is designed for devices with resource constraints or limited bandwidth. It is ideal for isolated monitoring and perfect for IoT frameworks with small code footprint.

Prerequisites

Install the following components for the MQTT client and Vertica driver:

  • Windows or Linux machine with Python 3.6 or greater (In this exploration, we used Linux)
  • pip latest version
  • pip install paho-mqtt

  • pip install vertica-python

Test Environment

  • MQTT Broker on Windows Server 2019
  • Python programs run on Linux machine
  • Vertica 11.1.0

Before You Begin

Make sure that the schema and tables are present in the Vertica database before running the Python code.

In this demo we are creating a table to store the sensor_id, timestamp, and the value from the sensor.

Following is the SQL code to execute in the Vertica server:

create schema mqtt;
create table mqtt.DHT22_Humidity_Data (  SensorID varchar,  Date_n_Time varchar,  Humidity varchar);

Installing Mosquitto MQTT Broker

MQTT broker is the central unit in the MQTT architecture. It is a server that receives and routes the messages to the appropriate subscribing clients.

  1. Go to https://mosquitto.org/download/ and download the MQTT broker installer for Windows.
  2. Double-click the installer and follow the instructions to install the MQTT broker.
  3. After installing, open the mosquitto.conf file from the location “C:\Program Files\mosquitto” and add the following command lines to the configuration file:

    listener 1883
    allow_anonymous true

  4. The MQTT broker by default runs on port 1883. See the following page to make sure that Port 1883 is open and listening on the Windows machine.
    https://bytesofgigabytes.com/networking/how-to-open-port-in-windows/
  5. Navigate to the Windows services application and restart the MQTT mosquitto service.

Python Code

We used 4 Python modules in this exploration. Each module is explained in this section.

Prerequisites

  • Python 3.6 or greater
  • pip latest version

Code

This exploration uses 4 python files:

mqtt_Publish_Dummy_Data.py

  • The code will publish dummy data to a MQTT broker as a humidity sensor for every second.
  • Edit the file to replace the “MQTT_Broker” value in the code with the IP address of the MQTT broker IP address.
import paho.mqtt.client as mqtt
import random, threading, json
from datetime import datetime

#====================================================
# MQTT Settings 
MQTT_Broker = <"IPADDRESS"> #IP address of the MQTT broker
MQTT_Port = 1883
Keep_Alive_Interval =60
MQTT_Topic_Humidity = "Home/BedRoom/DHT22/Humidity"

#====================================================

def on_connect(client, userdata, rc):
	if rc != 0:
		pass
		print("Unable to connect to MQTT Broker")
	else:
		print("Connected with MQTT Broker: ") + str(MQTT_Broker)
def on_publish(client, userdata, mid):
	pass		
def on_disconnect(client, userdata, rc):
	if rc !=0:
		pass
		
mqttc = mqtt.Client()
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval))		
	
def publish_To_Topic(topic, message):
	mqttc.publish(topic,message)
	print ("Published: " + str(message) + " " + "on MQTT Topic: " + str(topic))
	print ("")

#====================================================
# Code to generate Dummy data to MQTT Broker
no_of_records = 0 # counter to get the number of dummy records generated

def publish_Fake_Sensor_Values_to_MQTT_test():
	threading.Timer(0.3, publish_Fake_Sensor_Values_to_MQTT_test).start()
	Humidity_Fake_Value = float("{0:.2f}".format(random.uniform(50, 100)))
	Humidity_Data = {}
	Humidity_Data['SensorID'] = "Dummy-1"
	Humidity_Data['Date_n_Time'] = str((datetime.today()).strftime("%d-%b-%Y-%H-%M-%S-%f"))
	Humidity_Data['Humidity'] = str(Humidity_Fake_Value)
	humidity_json_data = json.dumps(Humidity_Data)
	print ("Publishing fake Humidity Value: " + str(Humidity_Fake_Value) + "...")
	publish_To_Topic(MQTT_Topic_Humidity, humidity_json_data)
	global no_of_records
	no_of_records = no_of_records + 1
	print("no of records", no_of_records)

publish_Fake_Sensor_Values_to_MQTT_test()

#====================================================

Store_Sensor_data_to_JSON_files.py

  • This python program will aggregate the incoming data from the MQTT broker into JSON files.
  • Edit the variable ‘no_of_rows_to_be_aggregated’ in the Store_Sensor_data_to_JSON_files.py program to give the number of rows to be aggregated in each file. By default, the value is 100000.
import json
import vertica_python
import os
from os.path import exists


no_of_rows_to_be_aggregated = 100000 # give the no of rows to be aggregated in a JSON file


#===============================================================
# Functions to push Sensor Data into Database
file_number_counter = 0 
i = 0 # No of records generated to be inserted into a the JSON file
#Function to copy the records inserted into a JSON file

#Function to insert the rows into a JSON file
def aggregate_humidity_rows(jsonData):
	json_Dict = json.loads(jsonData)
	global file_number_counter
	
	file_name = "data_"+str(file_number_counter)
	file_extension = ".JSON"
	file_name_complete = file_name+file_extension
	file_exists = exists(file_name_complete) # check if there is an exiting file
	if(file_exists):
		with open(file_name_complete, 'a', encoding='UTF8', newline='') as jsonfile:
			jsonfile.write(json.dumps(json_Dict))
			
	else:
		with open(file_name_complete, 'w', encoding='UTF8', newline='') as jsonfile:
			jsonfile.write(json.dumps(json_Dict))
			
			
	jsonfile.close()
	global i 
	i = i+1 # counter value to decide how many rows to be inserted into the JSON file.
	print(i)
	if (i == no_of_rows_to_be_aggregated): 
		i = 0
		file_number_counter = file_number_counter + 1
		newfile_name = file_name+"_completed"+file_extension
		os.rename(file_name_complete,newfile_name)

# Function to save Humidity to JSON File
def DHT22_Humidity_Data_Handler(jsonData):
	#Parse Data or perform Transformations if any.

	#Aggregate the data
	aggregate_humidity_rows(jsonData)
	
#===============================================================
# Master Function to Select DB Funtion based on MQTT Topic

def sensor_Data_Handler(Topic, jsonData):
	if Topic == "Home/BedRoom/DHT22/Temperature": #Temparature Topic handler not implemeted in this demo
		DHT22_Temp_Data_Handler(jsonData)
	elif Topic == "Home/BedRoom/DHT22/Humidity":
		DHT22_Humidity_Data_Handler(jsonData)	

#===============================================================

mqtt_Listen_Sensor_Data.py

  • Edit the mqtt_Listen_Sensor_Data.py file and give the MQTT_Broker variable value to the right IP address where the MQTT broker is running.
import paho.mqtt.client as mqtt
from store_sensor_data_to_JSON_files import sensor_Data_Handler

# MQTT Settings 
MQTT_Broker = "172.16.67.57"
MQTT_Port = 1883
Keep_Alive_Interval = 45
MQTT_Topic = "Home/BedRoom/#"

#Subscribe to all Sensors at Base Topic
def on_connect(self, mosq, obj, rc):
	mqttc.subscribe(MQTT_Topic, 0)

#Save Data into DB Table
def on_message(mosq, obj, msg):
	# This is the Master Call for saving MQTT Data into DB
	# For details of "sensor_Data_Handler" function please refer "sensor_data_to_db.py"
	print ("MQTT Data Received...")
	#print ("MQTT Topic: " + str(msg.topic))
	#print ("Data: " + str(msg.payload))
	sensor_Data_Handler(msg.topic, msg.payload)

def on_subscribe(mosq, obj, mid, granted_qos):
    pass

mqttc = mqtt.Client()

# Assign event callbacks
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_subscribe = on_subscribe

# Connect
mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval))

# Continue the network loop
mqttc.loop_forever()

copy_all_files_to_vertica.py

  • This program constantly looks for JSON file with the name “completed” in the current directory and copies the files to Vertica and deletes the file after processing.
  • Open the copy_all_files_to_vertica.py and edit the conn_info to connect to the Vertica server.
import vertica_python
import os

class DatabaseManager():
	def __init__(self):
		conn_info = {'host': '10.20.71.42', 'port': 5433, 'user': 'dbadmin', 'password': 'vert1caBdp', 'database': 'VMart'}		
		try:
			self.conn = vertica_python.connect(**conn_info)
		except Exception as e:
			print(e)
		self.conn.commit()
		self.cur = self.conn.cursor()
		
	def add_del_update_db_record(self, sql_query, args=()):
		self.cur.execute(sql_query, args)
		self.conn.commit()
		return
	def copy_JSON_records_to_vertica(self, sql_query, args=()):
		self.cur.copy(sql_query, args)
		self.conn.commit()
		return

	def __del__(self):
		self.cur.close()
		self.conn.close()

#The below function will be continuosly looking for JSON files with "completed" name in them
def get_all_the_files_list_copy_to_vertica():
	while (True):
		for i in os.listdir():
			if "completed" in i:
				print(i)
				dbObj = DatabaseManager()
				with open(i, "rb") as fs:
					dbObj.copy_JSON_records_to_vertica("COPY mqtt.DHT22_Humidity_Data(SensorID, Date_n_Time, Humidity) FROM STDIN parser fjsonparser()", fs)
				del dbObj
				print ("Inserted Humidity Data into Database.")
				print ("")
				fs.close()
				os.remove(i)


get_all_the_files_list_copy_to_vertica()

Executing the Python Code

Copy all the above Python programs into an accessible directory in the Linux machine.

Step 1:

  1. Open a new terminal and execute the following statement to generate and publish dummy data to the MQTT broker.

    $python <path to file>/mqtt_Publish_Dummy_Data.py

  2. Once the Python file is executed successfully you should see dummy data being published to the broker as in the following:

Step 2:

  1. Execute the following command in a new terminal line window to collect the data, and batch them and insert them into JSON files.

    python <path to file>/mqtt_Listen_Sensor_Data.py

  2. On successful execution you will be able to see JSON files containing the sensor data being generated in the same directory.

Step 3:

Execute Python copy_all_files_to_vertica.py in a new terminal window to copy all the JSON files that are generated into Vertica.

python <path to file>/ copy_all_files_to_vertica.py

Connect to Vertica to verify if the data is being inserted correctly. You can see that data is being copied into Vertica simultaneously as data is being read from the MQTT broker.

For More Information

Share this article: