Consuming Delimited Kafka Data

Vertica provides built-in functionality to consume Kafka messages. Kafka messages are typically in JSON or AVRO file format; however, they can be consumed in any format and delimited formats are fairly common. This article will take a detailed look at how Vertica consumes delimited Kafka messages.

Checking the Kafka Format

Kafka messages are often generated by another team or application. To verify the types of delimiters you will need to define when parsing these messages into a Vertica table, use the following command in the kafkacat command line interface:

$/opt/vertica/packages/kafka/bin/kafkacat -C -b broker:9092 -t blah -o beginning

3|Blah3|Foo|3|LASTCOLUMN

6|Blah6|Foo|6|LASTCOLUMN

7|Blah7|Foo|7|LASTCOLUMN

8|Blah8|Foo|8|LASTCOLUMN

9|Blah9|Foo|9|LASTCOLUMN

1|Blah1|Foo|1|LASTCOLUMN

2|Blah2|Foo|2|LASTCOLUMN

4|Blah4|Foo|4|LASTCOLUMN

5|Blah5|Foo|5|LASTCOLUMN

% Reached end of topic blah [2] at offset 333909

% Reached end of topic blah [0] at offset 333815

% Reached end of topic blah [1] at offset 332289

Manually Copying Delimited Kafka Data

Once you know the format of your delimited messages, add the proper delimited definitions as you would when copying a delimited file.

Record Terminators and the KafkaInsertDelimiters Filter

When loading delimited data via a file, usually there will be one record per line (with \n acting as the record terminator). When loading delimited data from Kafka, each message gets concatenated in a buffer parsed by the DELIMITED parser. Since the messages themselves are typically single records, they don’t always have a record delimiter. In this case, there is a risk of sending data without a record terminator(s) to the parser.

Vertica provides a filter (KafkaInsertDelimiters) that always appends a record terminator to each consumed message. In a COPY command, the filter comes after SOURCE KafkaSource and before DELIMITED parser’s parameters.

Here’s an example of inserting a line feed to act as record terminator (RECORD_TERMINATOR is optional since line feed is the default).

=> COPY blah SOURCE KafkaSource(stream='blah|0|-2,

        brokers='kafkabroker:9092',stop_on_eof=true)

FILTER KafkaInsertDelimiters(delimiter =E'\n') RECORD_TERMINATOR E'\n'

REJECTED DATA AS TABLE loader_rejects NO COMMIT; 

Vertica recommends that you always use the KafkaInsertDelimiters filter when consuming Kafka delimited data.

While your delimited messages might contain a record terminator, production errors could still occur. If a message already contains a record terminator, adding a second record terminator would in essence add an empty record, which will be ignored by the DELIMITED parser.

Using the DELIMITED Parser in a Scheduler Micro

Parsers are defined in your scheduler's load spec. The load spec is a set of rules for converting source data into a format that Vertica accepts. It provides parameters for customizing the COPY statement for a microbatch, including the load method (e.g. DIRECT), optional filters (e.g. KafkaInsertDelimiters), the parser (e.g. KafkaAvroParser), and parser parameters (e.g. 'enforce_length=true, schema_registry_url=http://localhost:8081').

Available Options

Parameter #ARGS Description

load-spec

1

Unique, user-defined name for the load spec.

filters 1 A complete Vertica FILTER chain containing all UDFilters for a microbatch.
parser 1 A parser is used during microbatch execution. The default is 'KafkaParser'.
parser-parameters 1 A comma-separated list of key=value pairs to be given as parameters to the parser.

For a list of all available load spec options, enter the following command:

/opt/vertica/packages/kafka/bin/vkconfig load-spec --help

DELIMITED is the default parser for Vertica. It does not need to be listed by name and, unlike any other parser, it does not accept a comma delimited list of parameters. Because the scheduler uses KafkaParser as a default, DELIMITED has to be specified as the load-spec parser and the DELIMITED parameters have to be passed as part of the load-spec filter instead of the load-spec parser-parameters.

Vkconfig does not document passing DELIMITED as a -parser option. It is however the only way to specify the use of that parser and overwrite the default.

Here is an example defining a DELIMITED load-spec with “,” as the non-default column delimiter:

/opt/vertica/packages/kafka/bin/vkconfig load-spec --conf kafka.props \

--create --load-spec spec_1 \

--filters "FILTER KafkaInsertDelimiters (delimiter=E'\n') DELIMITER ',' " \

--load-method DIRECT --parser DELIMITED

After you create the load spec with the /opt/vertica/... command, you can query the stream_load_specs schema table to make sure it is correct.

select * from stream_example.stream_load_specs where load_spec = 'spec_1';

-[ RECORD 1 ]-----+-------------------------------------------------------------

id                | 750001

load_spec         | spec_1

filters           | FILTER KafkaInsertDelimiters(delimiter-E'\n') DELIMITER ','

parser            | DELIMITED

parser_parameters |

load_method       | DIRECT

message_max_bytes |

uds_kv_parameters |

Additional Resources

For more information about consuming Kafka messages in Vertica, see the following sections in the Vertica documentation:

  • Manually Copying Data from Kafka

  • Automatically Copying Data from Kafka

  • Using Kafkacat to trouble shoot issues