KafkaSource
The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero length messages.
The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing Custom Formats for more an example.
You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.
Syntax
KafkaSource(stream='topic_name|partition|start_offset[|end_offset][,...]' [, brokers='host:port[,...]'] [, duration=interval] [, executionparallelism='value'] [, stop_on_eof=Boolean] [, eof_timeout=timeout] [, group_id='consumer_group_name'] [, kafka_conf='option=value[;option2=value2...])
Parameter | Description |
---|---|
stream
|
Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
|
brokers
|
A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica. Default Value: localhost:9092 |
duration
|
An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information. |
executionparallelism
|
The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently. If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool. If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool. |
stop_on_eof
|
Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using Default Value: FALSE |
group_id
|
The name of the Kafka consumer group to which Vertica reports its progress consuming messages. By default, Vertica reports its progress to a group named vertica_database-name. See Monitoring Vertica Message Consumption with Consumer Groups for more information. Set this value to NULL to disable progress reports to a Kafka consumer group. |
kafka_conf
|
A semicolon-delimited list of option |
Special Starting Offset Values
The start_offset portion of the stream
parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:
- -2 tells KafkaSource to start loading at the earliest available message in the topic's partition. This value is useful when you want to load as many messages as you can from the Kafka topic's partition.
- -3 tells KafkaSource to start loading from the consumer group's saved offset. If the consumer group does not have a saved offset, it starts loading from the earliest available message in the topic partition. See Monitoring Vertica Message Consumption with Consumer Groups for more information.
Loading from Multiple Topics in the Same Stream Parameter
You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:
- The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.
- Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.
- The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.
Duration Note
The duration
parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration
.
Example
The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_test with the following options:
- The stream is named web_hits which has a single partition.
- The load starts at the earliest message in the stream (identified by passing -2 as the start offset).
- The load ends when it reaches the message with offset 10000.
- The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.
- The brokers are listening on port 6667. (This is port number that the Hortonworks Hadoop distribution assigns to its Kafka brokers.)
- The load ends if it reaches the end of the stream before reaching the message with offset 10000. If you do not supply this option, the connector waits until Kafka sends a message with offset 10000.
- The loaded data is sent to the KafkaJSONParser for processing.
=> COPY web_test SOURCE KafkaSource(stream='web_hits|0|-2|10000', brokers='kafka01.example.com:6667,kafka03.example.com:6667', stop_on_eof=true) PARSER KafkaJSONParser(); Rows Loaded ------------- 1068 (1 row)