Parsing Custom Formats

Vertica supports the use of user-defined filters to manipulate data arriving from your streaming message bus. You can apply these filters to data before you parse it. By default, data that flows from the source does not contain message boundaries. The default Kafka parsers can locate the message boundaries on their own. However, other user-defined and Vertica parsers are not designed to recognize the message boundaries. Two Kafka-specific filters let you insert message boundary information to the data stream (such as delimiters) so that parsers are able to extract and parse individual messages.

Filters for Use with Kafka Data

Vertica includes the following filters:

  • KafkaInsertDelimiters — Transforms the Kafka data stream by inserting a user-specified delimiter between each message. The delimiter can contain any characters and be of any length. This parser uses the following syntax:

    KafkaInsertDelimiters(delimiter = 'delimiter')

  • KafkaInsertLengths — Transforms the Kafka data stream by inserting the length of the following record in bytes at the beginning of the record. Vertica writes lengths as 4-byte uint32 values in Big Endian network byte order. For example, a 100-byte record would be preceded by 0x00000064.

    KafkaInsertLengths()

The Vertica provided filters are mutually exclusive. You cannot use both to process a Kafka data stream in the same COPY statement.

Vertica also supports the use of additional Vertica and user-defined filters. If you are using a Vertica filter, it must appear first in the filter list. Use a comma to delimit multiple filters. If you are using a non-Kafka parser, you must use at least one filter to prepare your content for that parser. If you do not provide a filter, the parser fails with the message:

Input is not from Kafka source.

Example

The following example demonstrates loading comma-separated values from two partitions in a topic named iot-data. The load processes all of the messages in the two partitions, and exits when it reaches the end of the messages currently in Kafka. The example uses the KafkaInsertDelimiters filter to insert newlines between Kafka messages, turning them into traditional rows of data. It relies on the standard COPY parser to parse the CSV values by telling it to use a comma as the column delimiter.

=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2', 
                                     brokers='kafka01:9092', 
                                     stop_on_eof=True) 
                  FILTER KafkaInsertDelimiters(delimiter = E'\n') 
                  DELIMITER ',';
 Rows Loaded
-------------
        3430
(1 row)