Part 6: Extract, Transform and Load ADS-B messages into Kafka
I have discussed in previous blog posts the continuous stream of messages from aircraft transponders, captured and decoded using the DUMP1090 application, which we are planning on feeding into a series of Kafka topics, prior to loading into their corresponding tables in a Vertica database.
This blog post describes how, using a simple Extract, Transform and Load (ETL) process, we can:
- Capture the ADS-B messages from a TCP stream
- Extract the individual message types
- Perform some simple transformations
- Feed the messages into a series of message buffers
- Produce them into their respective Kafka topics using kafkacat.
When considering the best method to implement the required ETL process, I could have selected any number of commercial and open source solutions. I could also consider writing my own ETL process using any of the available programming languages, including JAVA, C++ and Python.
In this project, because I wanted to keep everything as simple as possible to develop and implement (and at the same time keep costs to a minimum), I decided to develop my own ETL application using BASH shell and various operating system and other open source utilities (such as awk and kafkacat).
With many moving parts, any of which could fail at any time, I also look at how we can ensure this ETL application continues to run in the event that one or more parts fail.
It should be noted that this ETL process is not intended to be a highly performant, production-ready application, but hopefully demonstrates what can be achieved using some relatively simple coding.
Unlike other custom-developed pieces of code used throughout this project, as the ETL code is over 1,400 lines long, it would not have been practical to have included it within this blog post. Sorry.
Overview of the ETL Process
Although a more detailed description of how this ETL process works is provided below, the following is a snippet from the application’s header:
# Part of the DUMP1090 application, this component manages the reading
# of the TCP pipe (defaulting to port 30003) being written to by the
# dump1090 ADS-B application, does some basic pre-processing of the
# single stream of data, splitting it into its component ADS-B
# messages before sending the streams into associative KAFKA
# Scheduled to run as a crontab job, the script will check if the
# various threads are running. If all are present, it will leave them
# in place. If any have died, the application will attempt to kill off
# any remaining threads before doing a fresh restart
Invoking the ETL Application
The application (dump1090_readpipe_writekafka.sh) can be invoked from the command line, but is typically run as a crontab job.
Although in theory we could run this application under any user account, in this example, we use dbadmin as the Vertica instance and database owner. Though this could equally be any other user (permissions and grants provided).
A series of parameters needs to be supplied, as detailed below:
o A name used to identify the location of the ADS-B receiver
o IP address (or DNS name) of the Kafka broker
o Port number of the Kafka broker
o Directory on the Kafka server to receive parsed data files (these are used as a backup of the data and may be used to batch-load data into the database in the absence of Kafka topics)
o A parameter file containing the names of the Kafka topics into which each of the DUMP1090 message types will be produced
o IP address (or DNS name) of the server running the DUMP1090 application
o Netcat (nc) port number used for picking up TCP messages from DUMP1090
o Directory used for recording the Process IDs (PID) of the background processes initiated by this application
o Directory in which to store LOG files generated by this application
When run as a crontab job, it is typically scheduled to run every few minutes (10 minutes is usually more than adequate).
When invoked, the application should run continuously. However, as mentioned earlier, with many moving parts, any of which could “die” unexpectedly (or deliberately!), re-running the application periodically, allows it to check if all the expected components are still running. If they are, there is nothing more to do, and the application will check again during the next cycle. If any component has failed, the application kills off any remaining components, then restarts the application afresh.
This application is not intended to be production-ready. As such, the approach described above is not necessarily the most elegant, but for the purposes of this project, is more than adequate.
A typical crontab entry (for the dbadmin user), may look something like this:
0,10,20,30,40,50 * * * * /home/dbadmin/projects/dump1090/dump1090_kafka/dump1090_readpipe_writekafka.sh --sitename pennardel01 --kafka_broker localhost --kafka_port 9092 --kafka_data_dir /home/dbadmin/projects/dump1090/data --dump1090_server localhost --dump1090_nc_port 30003 --log_dir /home/dbadmin/projects/dump1090/logs --pid_dir /home/dbadmin/projects/dump1090/pids 1>>/home/dbadmin/projects/dump1090/logs/crontab.log 2>&1
On invoking the dump1090_readpipe_writekafka.sh application, after validating the parameters listed above, the first task is to pick up the list of Kafka topics from the parameter file (as passed by the –kafka-topics parameter).
Kafka topics parameter file
This parameter file allows us to choose topics with different names across different installations.
When initially developed, it was agreed that the topic names could be hardcoded within the application. For example, the dump1090_msg_4 messages would be produced via the dump1090_msg_4 Kafka topic (and further down the line, the topics would have their messages written to a Vertica table of the same name – in this example dump1090_msg_4).
It was subsequently decided to allow topic names to be given different names depending on where the Kafka broker was located. To work around this requirement, the Kafka topics parameter file was introduced. This has a one-to-one mapping between the DUMP1090 messages and Kafka topics.
Where there is a requirement to retain the original mapping between DUMP1090 messages and the Kafka topics, the parameter file should be prepared as follows:
An example of where alternative Kafka topics names can be seen is below:
Is the application already running?
Each time the application is invoked (which from a crontab job, may be every 10 minutes), a check is made to determine whether all the components expected to be running are actually running.
If any components are not running (or indeed none of them are), any remaining components will be terminated (via a kill command) before restarting the application.
The application uses a series of files which contain the PIDs of several background processes, and if these files exist, checks whether the processes to which they refer are actually running.
In summary, these PID files identify the PIDs of
- The application itself
- A netcat (nc) process – this would be reading from the TCP port
- A set of 11 kafkacat processes – each waiting for messages to produce messages on the Kafka topics
- A set of 11 tail -f processes – each tailing a set of message buffers and feeding their output to the kafkacat processes
This results in 23 background processes and the application itself to constitute a fully operational application.
If a full complement of the process is running, nothing further is done (as the application is still working in background), with this check of the application terminated until the next cycle (in 10 minutes time).
Where we do not have a full complement of processes, the application terminates each of these remaining processes with a kill -9, and then continues to start the application from the beginning.
Making a fresh start
Where the application has to be restarted, we start off by touching a file (which we will call a message buffer) for each of the message threads (air, id, sta, msg_[1-8]).
As discussed in the Installing DUMP1090 to decode ADS-B signals blog post, when running, DUMP1090 publishes a continuous stream of decoded messages from the aircrafts’ transponders on a TCP port. This port defaults to 30003, but for clarity is provided as a parameter to this application.
Capturing the raw streaming TCP messages
In addition to performing the complete ETL processing on this streaming data, the application also captures the raw, unformatted TCP data. This is achieved by the STARTNETCAT function which fires off a single nc command in background, picking up messages from the DUMP1090 Server and Port (as defined as parameters) and sending all received messages to a single TXT file.
As with the other processes discussed below, the background PID of this process is captured and recorded such that it can be checked next time the application runs.
Starting a kafkacat producer for each of the topics
The application will deliver messages after being extracted and transformed, separated by message type/topic to a series of threads.
Each thread (one per message type/topic), initiates a tail -f on the associated message buffer with its output being directed as standard in (stdin) to a producer (-P) kafkacat process for the required topic on the Kafka broker and port (again, as provided as parameters).
Again, these processes are run in background, with the PID (of the kafkacat process) and its Parent PID (PPID) of the tail -f process being recorded for subsequent checking when the application re-runs in 10 minutes time.
The guts of the application – Extract and Transform
With all the previously defined components in place, the rather unassuming READFROMNETCAT function is the one that does the majority of the hard work.
This process starts off by running another nc command (similar to that described above).
This time the output is fed into a (theoretically) never ending nawk command.
The nawk component identifies which message types are being presented (air, id, sta, msg_[1-8]), and then uses printf of the formatted fields to write to its associated message buffer.
We have a set of tail -f commands picking up the messages being written to these message buffers and passing them to their associated kafkacat processes.
This function should in theory never end, and thus the application would never terminate.
However, as already described, any component (the 23 background processes or the application itself) could fail. Thus having the application re-run every 10 minutes (or otherwise) enables us to confirm everything is operating correctly, and if all looks well, to do nothing (other than carry on running “indefinitely”).
Testing kafkacat with “real” data
When we setup Zookeeper, Kafka and Kafkacat, we had the option of testing our configuration by creating a dummy test Kafka topic and then using kafkacat’s -P, producing a “Hello World” message, before consuming it (using the -C option).
Now that we have (hopefully) got some real streaming data being presented by DUMP1090, and being processed by the ETL application, we should now be able to run kafkacat in consume mode to view the messages appearing on each of the topics.
For example, to view the MSG,1 topics, issue the following:
kafkacat -C –b localhost –t dump1090_msg_1
This command will continue to run until you terminate it (e.g. via CTRL-C).
Next in this series of blog posts, we will look at how to prepare Vertica to integrate with these Kafka topics and start to ingest the streaming ADS-B data into our Vertica tables.
In case you missed it, here’s a link to the first post with the overview of the project.
Here’s part two in this series.
Here’s the third blog post in this series.
Here’s the fourth in this series of blog posts.
Here’s the fifth post in the series!
Also, check out this YouTube video of me, Mark Whalley, in an airline pilot’s uniform discussing the project.
Here’s an infographic that summarizes the Flight Tracker project.
And, by the way, this project won first place at the Arkansas University Raspberry Pi Bakeoff!