Kafka TLS/SSL Example Part 4: Loading Data Directly From Kafka

Once you have configured Kafka to accept SSL connections, verify that you can directly load data from it into Vertica. You should perform this step even if you plan to create a scheduler to automatically stream data. Successfully copying data directly from Kafka via an SSL connection lets you know that you configured the Vertica to Kafka SSL connection correctly.

You can choose to create a separate key and certificate for directly loading data from Kafka. This example re-uses the key and certificate created for the Vertica server in part 2 of this example.

You directly load data from Kafka by using the KafkaSource data source function with the COPY statement (see Manually Copying Data From Kafka for more information). The KafkaSource function creates the connection to Kafka, so it needs an SSL key, certificate, and related passwords to create an encrypted connection. You pass this information via session parameters. See Kafka User-Defined Session Parameters for a list of these parameters.

The easiest way to get the key and certificate into the parameters is by first reading them into vsql variables. You get their contents by using back quotes to read the file contents via the Linux shell. Then you set the session parameters from the variables. Before setting the session parameters, increase the MaxSessionUDParameterSize session parameter to add enough storage space in the session variables for the key and the certificates. They can be larger than the default size limit for session variables (1000 bytes).

The following example reads the server key and certificate and the root CA from the a directory named /home/dbadmin/SSL. Because the server's key password is not saved in a file, the example sets it in a Linux environment variable named KVERTICA_PASS before running vsql. The example sets MaxSessionUDParameterSize to 100000 before setting the session variables. Finally, it enables SSL for the Kafka connection and streams data from the topic named test.

$ export KVERTICA_PASS=server_key_password
$ vsql
Password: 
Welcome to vsql, the Vertica Analytic Database interactive terminal.

Type:  \h or \? for help with vsql commands
       \g or terminate with semicolon to execute query
       \q to quit

SSL connection (cipher: DHE-RSA-AES256-GCM-SHA384, bits: 256, protocol: TLSv1.2)

=> \set cert '\''`cat /home/dbadmin/SSL/server.crt`'\''
=> \set pkey '\''`cat /home/dbadmin/SSL/server.key`'\''
=> \set ca '\''`cat /home/dbadmin/SSL/root.crt`'\''
=> \set pass '\''`echo $KVERTICA_PASS`'\''
=> alter session set MaxSessionUDParameterSize=100000;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_Certificate=:cert;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKey_secret=:pkey;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKeyPassword_secret=:pass;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_CA=:ca;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_Enable_SSL=1;
ALTER SESSION
=> CREATE TABLE t (a VARCHAR);
CREATE TABLE
=> COPY t SOURCE KafkaSource(brokers='kafka01.mycompany.com:9093',
                             stream='test|0|-2', stop_on_eof=true, 
                             duration=interval '5 seconds') 
          PARSER KafkaParser();
 Rows Loaded 
-------------
           3
(1 row)

=> SELECT * FROM t;
                            a                            
---------------------------------------------------------
 test again
 More testing. These mesages seem to be getting through!
 test

(3 rows)