Vertica and MQTT: Technical Exploration
About this Document
MQTT is used for data exchange between IoT devices and server applications. The goal of this document is to integrate Vertica and MQTT to analyze data in Vertica from sensor devices. In this case, we used the Mosquitto MQTT broker to relay data from the sensor and copy this data into Vertica for further analysis. We ran Python programs to mimic sensors that send data to the MQTT broker. This data is read from the MQTT broker and copied into Vertica.
MQTT Overview
Message Queuing Telemetry Transport (MQTT) is a light weight publish subscribe network protocol that transports messages between IoT devices. MQTT is designed for devices with resource constraints or limited bandwidth. It is ideal for isolated monitoring and perfect for IoT frameworks with small code footprint.
Prerequisites
Install the following components for the MQTT client and Vertica driver:
- Windows or Linux machine with Python 3.6 or greater (In this exploration, we used Linux)
- pip latest version
-
pip install paho-mqtt
-
pip install vertica-python
Test Environment
- MQTT Broker on Windows Server 2019
- Python programs run on Linux machine
- Vertica 11.1.0
Before You Begin
Make sure that the schema and tables are present in the Vertica database before running the Python code.
In this demo we are creating a table to store the sensor_id, timestamp, and the value from the sensor.
Following is the SQL code to execute in the Vertica server:
create schema mqtt; create table mqtt.DHT22_Humidity_Data ( SensorID varchar, Date_n_Time varchar, Humidity varchar);
Installing Mosquitto MQTT Broker
MQTT broker is the central unit in the MQTT architecture. It is a server that receives and routes the messages to the appropriate subscribing clients.
- Go to https://mosquitto.org/download/ and download the MQTT broker installer for Windows.
- Double-click the installer and follow the instructions to install the MQTT broker.
- After installing, open the mosquitto.conf file from the location “C:\Program Files\mosquitto” and add the following command lines to the configuration file:
listener 1883
allow_anonymous true - The MQTT broker by default runs on port 1883. See the following page to make sure that Port 1883 is open and listening on the Windows machine.
https://bytesofgigabytes.com/networking/how-to-open-port-in-windows/ - Navigate to the Windows services application and restart the MQTT mosquitto service.
Python Code
We used 4 Python modules in this exploration. Each module is explained in this section.
Prerequisites
- Python 3.6 or greater
- pip latest version
Code
This exploration uses 4 python files:
mqtt_Publish_Dummy_Data.py
- The code will publish dummy data to a MQTT broker as a humidity sensor for every second.
- Edit the file to replace the “MQTT_Broker” value in the code with the IP address of the MQTT broker IP address.
import paho.mqtt.client as mqtt import random, threading, json from datetime import datetime #==================================================== # MQTT Settings MQTT_Broker = <"IPADDRESS"> #IP address of the MQTT broker MQTT_Port = 1883 Keep_Alive_Interval =60 MQTT_Topic_Humidity = "Home/BedRoom/DHT22/Humidity" #==================================================== def on_connect(client, userdata, rc): if rc != 0: pass print("Unable to connect to MQTT Broker") else: print("Connected with MQTT Broker: ") + str(MQTT_Broker) def on_publish(client, userdata, mid): pass def on_disconnect(client, userdata, rc): if rc !=0: pass mqttc = mqtt.Client() mqttc.on_connect = on_connect mqttc.on_disconnect = on_disconnect mqttc.on_publish = on_publish mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval)) def publish_To_Topic(topic, message): mqttc.publish(topic,message) print ("Published: " + str(message) + " " + "on MQTT Topic: " + str(topic)) print ("") #==================================================== # Code to generate Dummy data to MQTT Broker no_of_records = 0 # counter to get the number of dummy records generated def publish_Fake_Sensor_Values_to_MQTT_test(): threading.Timer(0.3, publish_Fake_Sensor_Values_to_MQTT_test).start() Humidity_Fake_Value = float("{0:.2f}".format(random.uniform(50, 100))) Humidity_Data = {} Humidity_Data['SensorID'] = "Dummy-1" Humidity_Data['Date_n_Time'] = str((datetime.today()).strftime("%d-%b-%Y-%H-%M-%S-%f")) Humidity_Data['Humidity'] = str(Humidity_Fake_Value) humidity_json_data = json.dumps(Humidity_Data) print ("Publishing fake Humidity Value: " + str(Humidity_Fake_Value) + "...") publish_To_Topic(MQTT_Topic_Humidity, humidity_json_data) global no_of_records no_of_records = no_of_records + 1 print("no of records", no_of_records) publish_Fake_Sensor_Values_to_MQTT_test() #====================================================
Store_Sensor_data_to_JSON_files.py
- This python program will aggregate the incoming data from the MQTT broker into JSON files.
- Edit the variable ‘no_of_rows_to_be_aggregated’ in the Store_Sensor_data_to_JSON_files.py program to give the number of rows to be aggregated in each file. By default, the value is 100000.
import json import vertica_python import os from os.path import exists no_of_rows_to_be_aggregated = 100000 # give the no of rows to be aggregated in a JSON file #=============================================================== # Functions to push Sensor Data into Database file_number_counter = 0 i = 0 # No of records generated to be inserted into a the JSON file #Function to copy the records inserted into a JSON file #Function to insert the rows into a JSON file def aggregate_humidity_rows(jsonData): json_Dict = json.loads(jsonData) global file_number_counter file_name = "data_"+str(file_number_counter) file_extension = ".JSON" file_name_complete = file_name+file_extension file_exists = exists(file_name_complete) # check if there is an exiting file if(file_exists): with open(file_name_complete, 'a', encoding='UTF8', newline='') as jsonfile: jsonfile.write(json.dumps(json_Dict)) else: with open(file_name_complete, 'w', encoding='UTF8', newline='') as jsonfile: jsonfile.write(json.dumps(json_Dict)) jsonfile.close() global i i = i+1 # counter value to decide how many rows to be inserted into the JSON file. print(i) if (i == no_of_rows_to_be_aggregated): i = 0 file_number_counter = file_number_counter + 1 newfile_name = file_name+"_completed"+file_extension os.rename(file_name_complete,newfile_name) # Function to save Humidity to JSON File def DHT22_Humidity_Data_Handler(jsonData): #Parse Data or perform Transformations if any. #Aggregate the data aggregate_humidity_rows(jsonData) #=============================================================== # Master Function to Select DB Funtion based on MQTT Topic def sensor_Data_Handler(Topic, jsonData): if Topic == "Home/BedRoom/DHT22/Temperature": #Temparature Topic handler not implemeted in this demo DHT22_Temp_Data_Handler(jsonData) elif Topic == "Home/BedRoom/DHT22/Humidity": DHT22_Humidity_Data_Handler(jsonData) #===============================================================
mqtt_Listen_Sensor_Data.py
- Edit the mqtt_Listen_Sensor_Data.py file and give the MQTT_Broker variable value to the right IP address where the MQTT broker is running.
import paho.mqtt.client as mqtt from store_sensor_data_to_JSON_files import sensor_Data_Handler # MQTT Settings MQTT_Broker = "172.16.67.57" MQTT_Port = 1883 Keep_Alive_Interval = 45 MQTT_Topic = "Home/BedRoom/#" #Subscribe to all Sensors at Base Topic def on_connect(self, mosq, obj, rc): mqttc.subscribe(MQTT_Topic, 0) #Save Data into DB Table def on_message(mosq, obj, msg): # This is the Master Call for saving MQTT Data into DB # For details of "sensor_Data_Handler" function please refer "sensor_data_to_db.py" print ("MQTT Data Received...") #print ("MQTT Topic: " + str(msg.topic)) #print ("Data: " + str(msg.payload)) sensor_Data_Handler(msg.topic, msg.payload) def on_subscribe(mosq, obj, mid, granted_qos): pass mqttc = mqtt.Client() # Assign event callbacks mqttc.on_message = on_message mqttc.on_connect = on_connect mqttc.on_subscribe = on_subscribe # Connect mqttc.connect(MQTT_Broker, int(MQTT_Port), int(Keep_Alive_Interval)) # Continue the network loop mqttc.loop_forever()
copy_all_files_to_vertica.py
- This program constantly looks for JSON file with the name “completed” in the current directory and copies the files to Vertica and deletes the file after processing.
- Open the copy_all_files_to_vertica.py and edit the conn_info to connect to the Vertica server.
import vertica_python import os class DatabaseManager(): def __init__(self): conn_info = {'host': '10.20.71.42', 'port': 5433, 'user': 'dbadmin', 'password': '<password>', 'database': 'VMart'} try: self.conn = vertica_python.connect(**conn_info) except Exception as e: print(e) self.conn.commit() self.cur = self.conn.cursor() def add_del_update_db_record(self, sql_query, args=()): self.cur.execute(sql_query, args) self.conn.commit() return def copy_JSON_records_to_vertica(self, sql_query, args=()): self.cur.copy(sql_query, args) self.conn.commit() return def __del__(self): self.cur.close() self.conn.close() #The below function will be continuosly looking for JSON files with "completed" name in them def get_all_the_files_list_copy_to_vertica(): while (True): for i in os.listdir(): if "completed" in i: print(i) dbObj = DatabaseManager() with open(i, "rb") as fs: dbObj.copy_JSON_records_to_vertica("COPY mqtt.DHT22_Humidity_Data(SensorID, Date_n_Time, Humidity) FROM STDIN parser fjsonparser()", fs) del dbObj print ("Inserted Humidity Data into Database.") print ("") fs.close() os.remove(i) get_all_the_files_list_copy_to_vertica()
Executing the Python Code
Copy all the above Python programs into an accessible directory in the Linux machine.
Step 1:
- Open a new terminal and execute the following statement to generate and publish dummy data to the MQTT broker.
$python <path to file>/mqtt_Publish_Dummy_Data.py
- Once the Python file is executed successfully you should see dummy data being published to the broker as in the following:
Step 2:
- Execute the following command in a new terminal line window to collect the data, and batch them and insert them into JSON files.
python <path to file>/mqtt_Listen_Sensor_Data.py
- On successful execution you will be able to see JSON files containing the sensor data being generated in the same directory.
Step 3:
Execute Python copy_all_files_to_vertica.py in a new terminal window to copy all the JSON files that are generated into Vertica.
python <path to file>/ copy_all_files_to_vertica.py
Connect to Vertica to verify if the data is being inserted correctly. You can see that data is being copied into Vertica simultaneously as data is being read from the MQTT broker.