Preparing Vertica to consume messages from Kafka
How time has flown (no pun intended!). At long last getting back to continuing this project on tracking commercial aircraft…
Having covered ADS-B, the Raspberry Pi, DUMP1090, Zookeeper and Kafka and then in the previous blog post,
how to develop an Extract, Transform and Load (ETL) application to present a continuous stream of messages into a set of Kafka topics, we can now move on to the point at which we can introduce Vertica to consume those messages.
In this blog post, I will discuss how we integrate our Kafka topics and their messages containing details of aircraft currently in-flight with Vertica.
Recap on our Kafka topics
As you may recall, the Raspberry Pi and digital broadcast receiver, capture radio messages from aircraft transponders as they fly in range of the receiver. These messages form part of a technology known as Automatic Dependent Surveillance — Broadcast (ADS-B). The DUMP1090 software then decodes these digital ADS-B signals, with one of its outputs being a continuous stream of messages on a TCP port (30003).
The stream of messages comprises of a number of message types: AIR, ID, STA and MSG_[1-8].
Our ETL process extracts the individual message types (11 in total) and publishes them via kafkacat into a series of Kafka topics (one per message type).
Finding a home for 100m messages per day
At the time of writing (March 2019), our three “radars” of two Raspberry PIs (one in New York near JFK the other in my home/office in Pennard, South Wales) and the HPE EdgeLine gateway in Geneva are producing ~100m messages per day and feeding these into the 11 Kafka topics.
Before we look at Vertica’s options for consuming data from Kafka, we first need to prepare some tables into which we will load this data.
Thankfully, with Vertica being ANSI SQL compliant, this is as simple as issuing a series of “CREATE TABLE” statements for each of the Kafka topics, and knowing what each of the columns look like, we can choose the appropriate column names, data type and lengths.
Unlike other RDBMs technologies, if the format of the messages was unknown, or changed over time, we could take advantage of Vertica’s FLEX TABLE option — more on this can be found in our excellent documentation
. However, as I have written my own ETL script, and have very tight control on what is published on the Kafka topics, I am more than happy to work with Vertica’s “normal” tables. By doing so, we can really take advantage of Vertica’s unique column-store technology including reducing disk footprint and exceptional query performance speeds.
Taking as an example the MSG_1 message type, the DDL is as simple as:
create table :h_schema.dump1090_msg_1
(NOTE: All of the scripts: ETL, Kafka, DDL etc are all available on github or directly from the author: email@example.com
Joining the dots: Connecting Vertica to Kafka
So we have the 11 Kafka topics being continuously fed with live streaming data and the corresponding 11 Vertica tables into which we want to stream that data so it can be used for all those brilliant things we can do with it! But you’ll have to wait for a subsequent blog post to reveal some of these really exciting things.
If only I could write blog posts as quickly as Vertica could load and analyse data!
In the meantime, let’s look at what we need to do to connect Vertica to Kafka to start ingesting that streaming data directly into our Vertica tables.
The great news is that Vertica has a very simple, but extremely flexible command-line-interface (CLI) for setting up, maintaining and monitoring the integration between the two. Not the prettiest of names (vkconfig), but wow does it do one heck of a lot behind the scenes to make our lives so much easier.
I won’t cover every aspect of it here (as it is so well covered in our excellent documentation
What I will cover here are the bits you need to do to get the two joined together and start ingesting potentially huge volumes of data at speed.
I like to think of the “scheduler” as the overarching component that schedules data loads from a streaming data source into Vertica.
As mentioned, the Vertica utility that manages all of the Vertica/Kafka integration, is called vkconfig.
To configure a scheduler for our flight tracking use case, like so many tasks in Vertica, is a single line of code.
/opt/vertica/packages/kafka/bin/vkconfig scheduler –create –config-schema ‘dump1090Scheduler’ –dbhost ‘localhost’ –username ‘dbadmin’ –password ‘’ –frame-duration ‘00:00:10’
Although there are many other optional parameters (and some here that I didn’t need to include), but let us look at the structure of this scheduler specification.
- scheduler: instructs vkconfig that we are dealing here with activities related to the Scheduler.
- –create: that we want to create a new scheduler (rather than delete, update etc)
- –config-schema: the name of the scheduler
- –dbhost: the DNS name or IP address of the Vertica cluster (any of the Vertica nodes can be chosen)
- –username / –password: the username and password that will own this Scheduler
- –frame-duration: the duration of the Scheduler’s frame in which every configured microbatch runs (see later for a description of microbatch). Here the default of 10 seconds has been chosen.
As you will soon see, many of these parameters are common across the remaining vkconfig options. To save typing, we could put these common parameters into a configuration file, then refer to this each time we invoke vkconfig using the –conf config_file
Not to be confused with the Vertica cluster, this vkconfig option is used to identify the Kafka cluster’s broker to Vertica.
An example of which can be seen below:
/opt/vertica/packages/kafka/bin/vkconfig cluster –create –cluster vertica_pi_cluster –hosts vertica-pi:9092 –conf dump1090.conf
With our common parameters (username, password, host etc) now defined in a configuration file (dump1090.conf), we can concentrate just on the parameters we are introducing here:
- Cluster: directing vkconfig that we are working on the Cluster settings
- –cluster: the name of our cluster (vertica_pi_cluster)
- –hosts: the name and port of our Kafka broker
With our cluster defined, we can now introduce the Kafka topic(s) as the “source” of our streaming data feed.
As we have 11 topics, we will need to define 11 sources. One of which (dump1090_msg_1) is shown here:
/opt/vertica/packages/kafka/bin/vkconfig source –create –cluster vertica_pi_cluster –source dump1090_msg_1 –partitions 1 –conf dump1090.conf
In this example:
- –source: the name of one of our Kafka topics (dump1090_msg_1)
- –partitions: the number of partitions you want to load. If you recall when we created our Kafka topics, we only defined a single partition for each topic, thus the “1” defined here.
Having defined a series of sources (Kafka topics), we next move on to introducing the table(s) into which we want Vertica to load (COPY) the streaming data.
In this use case, I decided to match 1-for-1 the Kafka topics (sources) with Vertica tables (destinations).
/opt/vertica/packages/kafka/bin/vkconfig target –create –target-schema dump1090_kafka –target-table dump1090_msg_1 –conf dump1090.conf
You can probably guess by now what each of these parameters are, but for clarity:
- –target-schema: the name of the schema (defaulting to “public”) in which the target table resides – noting that you must have already created this table as noted earlier.
- –target-table: that table into which Vertica will load (COPY) data from Kafka’s topic(s)
The Scheduler’s load spec provides parameters that Vertica uses when parsing the data loaded from Kafka.
Common use cases include defining that the data is arriving in JSON or Avro format. But in this example, and keeping things far simpler (IMHO), we are using a CSV format that is defined as shown below:
/opt/vertica/packages/kafka/bin/vkconfig load-spec –create –load-spec dump1090_load_spec –filters “filter KafkaInsertDelimiters(delimiter=E’\n’)” –load-method ‘TRICKLE’ –parser DELIMITED –parser-parameters “DELIMITER E’,'” –conf dump1090.conf
This might look at first glances a little more complicated than the other vkconfig lines, but hopefully the following will explain all:
- –filters: Using the KafkaInsertDelimeter option, we are transforming the Kafka stream by inserting a user-defined delimiter (in this instance a New Line “\n”) to separate the messages.
- –load-method: This instructs Vertica to use our chosen method of loading data — TRICKLE is typically used when loading low number of rows. In Enterprise Edition, this writes initially to WOS before moving to ROS. In Eon mode, as we do not have WOS, Vertica will always write directly to ROS.
- –parser: As mentioned, in this use case we are loading CSV delimited messages into Kafka, and thus we use the DELIMITED parser. Other options include KafkaAvroParser and KafkaJSONParser.
- –parser-parameters: To accompany the DELIMITED parser, here we specify what the delimiter is. In this use case, it is a comma. What could be simpler?
With all of the previous elements defined, I like to think of the microbatch option as the component that brings all these together.
It is the microbatch which represents a single segment of a data load from a streaming data source, and encompasses all of the information needed by the scheduler to perform streaming data load into Vertica.
Using the MSG_1 example from above, a typical microbatch definition might look like this:
/opt/vertica/packages/kafka/bin/vkconfig microbatch –create –microbatch dump1090_msg_1 –add-source-cluster vertica_pi_cluster –add-source dump1090_msg_1 –target-schema dump1090_kafka –target-table dump1090_msg_1 –load-spec dump1090_load_spec –rejection-schema dump1090_kafka –rejection-table dump1090_msg_1_rej –conf dump1090.conf
Although there again appear to be a lot of parameters here, they are hopefully all pretty much self-explanatory:
- –microbatch: gives this microbatch a name. Here I am using the same name as the topic from which the data will be streaming and the destination table in Vertica that will receive the data (dump1090_msg_1).
- –add-source-cluster / –add-source: the name of the Kafka broker and Kafka topic that will be supplying the streaming data
- –target-schema / –target-table: the Vertica schema and table that will receive the streaming data
- –load-spec: picking up the load spec we previously defined to tell Vertica what the data looks like so it can form the appropriate COPY syntax
- –rejection-schema / –rejection-table: just in case Vertica is unable to load some of the data (possibly because it is malformed and does not “fit” into the destination table), the rejection schema and table will be used to store the rejected data and the reasons for rejection.
Like all great missions, with good preparation, we can all look forward to what comes next — The Launch!
We have setup all the pieces, and they are primed to go.
The next, and final piece of this jigsaw is to launch the Scheduler. This is a simple as firing the launch vkconfig option (and counting 3, 2, 1, we have lift off):
nohup /opt/vertica/packages/kafka/bin/vkconfig launch –conf dump1090.conf &
The only things of note here are that as we would typically want the Scheduler to carry on running (until we choose to stop it), I am running this as a background process — as indicated with the “nohup” and “&” directives.
All being well, our data will be flying into our Vertica tables from the aircraft transponders.
In the next blog post, we will look at how we can check on the status of the Kafka/Vertica loading, and then subsequently start to look at what we can do with all this data!
In case you missed it, here’s a link to the first post with the overview of the project
Here’s part two
in this series.
Here’s the third blog post
in this series.
Here’s the fourth in this series of blog posts.
Here’s the fifth post in the series
Sixth in this series of blog posts
Also, check out this YouTube video of me, Mark Whalley, in an airline pilot’s uniform discussing the project
Here’s an infographic that summarizes the Flight Tracker project
And, by the way, this project won first place at the Arkansas University Raspberry Pi Bakeoff!