Producing Data Using KafkaExport

The KafkaExport function lets you stream data from Vertica to Kafka. You pass this function three arguments and two or three parameters:

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn 
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting'])
OVER (partition_clause) FROM table;

The partitionColumn and keyColumn arguments set the Kafka topic's partition and key value, respectively. You can set either or both of these values to NULL. If you set the partition to NULL, Kafka uses its default partitioning scheme (either randomly assigning partitions if the key value is NULL, or based on the key value if it is not).

The valueColumn argument is a LONG VARCHAR containing message data that you want to send to Kafka. Kafka does not impose structure on the message content. Your only restriction on the message format is what the consumers of the data are able to parse.

You are free to convert your data into a string in any way you like. For simple messages (such as a comma-separated list), you can use functions such as CONCAT to assemble your values into a message. If you need a more complex data format, such as JSON, consider writing a UDx function that accepts columns of data and outputs a LONG VARCHAR containing the data in the format you require. See Developing User-Defined Extensions (UDxs) for more information.

The data exported from Vertica using KafkaExport is sent to Kafka in JSON format.

See KafkaExport for detailed information about KafkaExport's syntax.

Export Example

This example shows you how to perform a simple export of several columns of a table. Suppose you have the following table containing a simple set of Internet of things (IOT) data:

=> SELECT * FROM iot_report LIMIT 10;
 server |        date         |       location        |    id    
--------+---------------------+-----------------------+----------
      1 | 2016-10-11 04:09:28 | -14.86058, 112.75848  | 70982027
      1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
      1 | 2017-10-19 14:04:33 | -71.72156, -36.27381  | 94328189
      1 | 2018-07-11 19:35:18 | -9.41315, 102.36866   | 48366610
      1 | 2018-08-30 08:09:45 | 83.97962, 162.83848   |   967212
      2 | 2017-01-20 03:05:24 | 37.17372, 136.14026   | 36670100
      2 | 2017-07-29 11:38:37 | -38.99517, 171.72671  | 52049116
      2 | 2018-04-19 13:06:58 | 69.28989, 133.98275   | 36059026
      2 | 2018-08-28 01:09:51 | -59.71784, -144.97142 | 77310139
      2 | 2018-09-14 23:16:15 | 58.07275, 111.07354   |  4198109
(10 rows)

=> \d iot_report 
                                       List of Fields by Tables
 Schema |   Table    |  Column  |    Type     | Size | Default | Not Null | Primary Key | Foreign Key 
--------+------------+----------+-------------+------+---------+----------+-------------+-------------
 public | iot_report | server   | int         |    8 |         | f        | f           | 
 public | iot_report | date     | timestamp   |    8 |         | f        | f           | 
 public | iot_report | location | varchar(40) |   40 |         | f        | f           | 
 public | iot_report | id       | int         |    8 |         | f        | f           | 
(4 rows)

You want to send the data in this table to a Kafka topic named iot_results for consumption by other applications. Looking at the data and the structure of the iot_report, you may decide the following:

  • The server column is a good match for the partitions in iot_report. There are three partitions in the Kafka topic, and the values in server column are between 1 and 3. Suppose the partition column had a larger range of values (for example, between 1 and 100). Then you could use the modulo operator (%) to coerce the values into the same range as the number of partitions (server % 3).
    A complication with these values is that the values in the server are 1-based (the lowest value in the column is 1). Kafka's partition numbering scheme is zero-based. So, you must adjust the values in the server column by subtracting 1 from them.
  • The id column can act as the key. This column has a data type of INTEGER. The KafkaExport function expects the key value to be a VARCHAR. Vertica does not automatically cast INTEGER values to VARCHAR, so you must explicitly cast the value in your function call.
  • The consumers of the iot_report topic expect values in comma-separated format. You can combine the values from the date and location columns into a single VARCHAR using nested calls to the CONCAT function.

The final piece of information you need to know is the host names and port numbers of the brokers in your Kafka cluster. In this example, there are two brokers named kafka01 and kafka03, running on port 6667 (the port that Hortonworks clusters use). Once you have all of this information, you are ready to export your data.

The following example shows how you might export the contents of iot_report:

=> SELECT KafkaExport(server - 1, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location) 
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667', 
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report; 
 partition | key | message | failure_reason 
-----------+-----+---------+----------------
(0 rows)

KafkaExport returned 0 rows which means Vertica was able to send all of your data to Kafka without any errors.

Other things to note about the example:

  • The CONCAT function automatically converts the date column's DATETIME value to a VARCHAR for you, so you do not need to explicitly cast it.
  • Two nested CONCAT functions are necessary to concatenate the date field with a comma, and the resulting string with the location field.
  • Adding a third column to the message field would require two additional CONCAT function calls (one to concatenate a comma after the location column, and one to concatenate the additional column's value). Using CONCAT becomes messy after just a few column's worth of data.

On the Kafka side, you will see whatever you sent as the valueColumn (third) argument of the KafkaExport function. In the above example, this is a CSV list. If you started a console consumer for iot_results topic before running the example query, you would see the following output when the query runs:

$ /opt/kafka/bin/kafka-console-consumer.sh --topic iot_results --zookeeper localhost
2017-10-10 12:08:33, 78.84883, -137.56584
2017-12-06 16:50:57, -25.33024, -157.91389
2018-01-12 21:27:39, 82.34027, 116.66703
2018-08-19 00:02:18, 13.00436, 85.44815
2016-10-11 04:09:28, -14.86058, 112.75848
2017-07-02 12:37:48, -21.42197, -127.17672
2017-10-19 14:04:33, -71.72156, -36.27381
2018-07-11 19:35:18, -9.41315, 102.36866
2018-08-30 08:09:45, 83.97962, 162.83848
2017-01-20 03:05:24, 37.17372, 136.14026
2017-07-29 11:38:37, -38.99517, 171.72671
2018-04-19 13:06:58, 69.28989, 133.98275
2018-08-28 01:09:51, -59.71784, -144.97142
2018-09-14 23:16:15, 58.07275, 111.07354

KafkaExport's Return Value

KafkaExport outputs any rows that Kafka rejected. For example, suppose you forgot to adjust the partition column to be zero-based in the previous example. Then some of the rows exported to Kafka would specify a partition that does not exist. In this case, Kafka rejects these rows, and KafkaExport reports them in table format:

=> SELECT KafkaExport(server, id::VARCHAR, 
   CONCAT(CONCAT(date, ', '), location) 
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report; 
 partition |   key    |                  message                    |      failure_reason      
-----------+----------+---------------------------------------------+--------------------------
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584,  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389, | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703,   | Local: Unknown partition
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815,    | Local: Unknown partition
(4 rows)

Another common reason for Kafka rejecting a row is that its message value is longer than Kafka's message.max.bytes setting.

You can capture this output by creating a table to hold the rejects. Then use an INSERT statement to insert KafkaExport's results:

=> CREATE TABLE export_rejects (partition INTEGER, key VARCHAR, message LONG VARCHAR, failure_reason VARCHAR);
CREATE TABLE
=> INSERT INTO export_rejects SELECT KafkaExport(server, id::VARCHAR, 
   CONCAT(CONCAT(date, ', '), location) 
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report; 
 OUTPUT
--------
      4
(1 row)
=> SELECT * FROM export_rejects;
 partition |   key    |                  message                   |      failure_reason
-----------+----------+--------------------------------------------+--------------------------
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815    | Local: Unknown partition
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389 | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703   | Local: Unknown partition
(4 rows)