KafkaExport
Sends Vertica data to Kafka.
If Vertica successfully exports all of the rows of data to Kafka, this function returns zero rows. You can use the output of this function to copy failed messages to a secondary table for evaluation and reprocessing.
Syntax
SELECT KafkaExport(partitionColumn, keyColumn, valueColumn USING PARAMETERS brokers='host[:port][,host...]', topic='topicname' [,kafka_conf='kafka_configuration_setting'] [,fail_on_conf_parse_error=Boolean]) OVER (partition_clause) FROM table;
Parameters
Argument | Description |
---|---|
partitionColumn | The target partition for the export. If you set this value to NULL, Vertica uses the default partitioning scheme. You can use the partition argument to send messages to partitions that map to Vertica segments. |
keyColumn | The user defined key value associated with the valueColumn. Use NULL to skip this argument. |
valueColumn |
The message itself. The column is a LONG VARCHAR, allowing you to send up to 32MB of data to Kafka. However, Kafka may impose its own limits on message size. |
brokers
|
A string containing a comma-separated list of one or more host names or IP addresses (with optional port number) of brokers in the Kafka cluster. |
topic
|
The Kafka topic to which you are exporting. |
kafka_conf
|
A JSON-formatted object of option/value pairs to pass directly to the rdkafka library. This is the library Vertica uses to communicate with Kafka. You can use this parameter to directly set configuration options that are not available through the Vertica integration with Kafka. See Directly Setting Kafka Library Options for details. |
fail_on_conf_parse_error
|
Determines whether the function fails when Default Value: FALSE For accepted option and value formats, see Directly Setting Kafka Library Options. For a list of valid configuration properties, see the rdkafka GitHub repository. |
Examples
=> SELECT KafkaExport(partion, messageId, message USING PARAMETERS brokers='kafka01.example.com:9092', source='failure_test', kafka_conf='{"message.max.bytes":64000}') OVER (PARTITION BEST) FROM failure_test; partition | key | substr | failure_reason -----------+-------------+--------------------------------------------+------------------------- -123 | key1 | negative partition not allowed | Local: Unknown partition 54321 | | nonexistant partition | Local: Unknown partition 0 | normal key1 | normal value1 | Broker: Message size too large 0 | | aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa | Broker: Message size too large 0 | normal key2 | normal value2 | Broker: Message size too large