KafkaSource

The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero length messages.

The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing Custom Formats for more an example.

You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.

Syntax

KafkaSource(stream='topic_name|partition|start_offset[|end_offset][,...]'
            [, brokers='host:port[,...]']
            [, duration=interval]
            [, executionparallelism='value']
            [, stop_on_eof=Boolean]
            [, eof_timeout=timeout]
            [, group_id='consumer_group_name']
            [, kafka_conf='option=value[;option2=value2...])
Parameter Description
stream

Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :

  • topic_name: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information.
  • partition: the partition in the Kafka topic to copy.
  • start_offset: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offset start_offset is loaded). See Special Starting Offset Values below for additional options.
  • end_offset: the optional offset where the load should end. This offset is exclusive (the message with the offset end_offset will not be loaded).
    To end a load using end_offset, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not set offset values for others results in an error.
    If you do not specify an ending offset, you must supply at least one other ending condition using stop_on_eof or duration.
brokers

A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.

Default Value: localhost:9092

duration

An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.

executionparallelism

The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.

If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool.

If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool.

stop_on_eof

Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using duration or by supplying ending offsets instead.

Default Value: FALSE

eof_timeout

A timeslice of type INTERVAL. If a COPY command does not receive any Kafka messages within the eof_timeout interval, Vertica responds by ending that COPY statement. This parameter applies only if stop_on_eof is TRUE.

Deprecated: This parameter has been deprecated and will be removed in a future version of the product. It is no longer necessary because newer versions of the library that Vertica uses to communicate with Kafka now indicate when the when the stream has ended.

group_id

The name of the Kafka consumer group to which Vertica reports its progress consuming messages. By default, Vertica reports its progress to a group named vertica_database-name. See Monitoring Vertica Message Consumption with Consumer Groups for more information.

Set this value to NULL to disable progress reports to a Kafka consumer group.

kafka_conf

A semicolon-delimited list of option=value pairs to pass directly to the rdkafka library. This is the library Vertica uses to communicate with Kafka. You can use this parameter to directly set configuration options that are not available through the Vertica integration with Kafka. See Directly Setting Kafka Library Options for details.

Special Starting Offset Values

The start_offset portion of the stream parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:

Loading from Multiple Topics in the Same Stream Parameter

You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:

Duration Note

The duration parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration.

Example

The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_test with the following options:

=> COPY web_test 
   SOURCE KafkaSource(stream='web_hits|0|-2|10000',
                      brokers='kafka01.example.com:6667,kafka03.example.com:6667', 
                      stop_on_eof=true) 
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1068
(1 row)