Using COPY with Data Streaming
You can manually stream data from Kafka into Vertica using a COPY statement. This streaming happens only for a limited time; when the COPY statement finishes, no additional messages are streamed from Kafka to Vertica. Manually copying data this way is useful when you have a specific set of messages in Kafka that you want to analyze. It is also useful when you want to explore the data in a Kafka stream before setting up a scheduler to continuously stream the data into Vertica.
The source of your COPY statement is always KafkaSource. Your COPY statement usually uses one of three parsers: KafkaParser, KafkaJSONParser, or KafkaAvroParser.
For example:
=> COPY public.from_kafka SOURCE KafkaSource(stream='iot_data|0|-2,iot_data|1|-2', brokers='kafka01.example.com:9092', duration=interval '10000 milliseconds', executionparallelism='1', stop_on_eof=true) PARSER KafkaAVROParser(schema_registry_url='http://localhost:8081/subjects/iot_data-value/versions/1') REJECTED DATA AS TABLE public.rejections DIRECT NO COMMIT;
In the previous example:
- The KafkaSource streams data from partitions 0 and 1 of the topic named iot_data.
- The streaming starts from the earliest available message in the stream (set by the -2 in the stream parameter).
- KafkaSource reads the data from the Kafka broker running on the host named kafka01.example.com on port 9092.
- The streaming continues until either 10000 milliseconds (10 seconds) pass or KafkaSource reaches the end of the stream.
- KafkaSource is limited to using a single thread to load data from the two partitions.
- The stream is parsed as Avro data.
- The schema that the Avro parser uses to parse the data is retrieved from a schema registry running on the local system.
- Rejected data is saved in a table named public.rejections.
Note: If you are copying data containing default values into a flex table, you must identify the default value column as __raw__
. For more information, refer to "Handling Default Values During Loading" in Bulk Loading Data into Flex Tables.
The following sections explain the parameters you can pass to the KafkaSource and its associated parsers.
KafkaSource
The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero length messages.
The syntax for calling KafkaSource is:
KafkaSource(
stream='topic_name|partition|start_offset[|end_offset][,...]' [, brokers='host:port[,...]'] [, duration=interval] [, executionparallelism='value'] [, stop_on_eof=Boolean] [, eof_timeout=timeout])
Parameter | Description |
---|---|
stream
|
Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
|
brokers
|
A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica. Default Value: localhost:9092 |
duration
|
An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information. |
executionparallelism
|
A integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently. If you do not specify this parameter, Vertica automatically creates a thread for every partition. If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler resource pool. |
stop_on_eof
|
Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using Default Value: FALSE |
eof_timeout
|
A timeslice of type INTERVAL. If a COPY command does not receive any Kafka messages within the eof_timeout interval, Vertica responds by ending that COPY statement. This parameter applies only if stop_on_eof is TRUE. |
Loading from Multiple Topics in the Same Stream Parameter
You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:
- The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.
- Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.
- The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.
Duration Note
The duration
parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration
.
Example
The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_test with the following options:
- The stream is named web_hits which has a single partition.
- The load starts at the earliest message in the stream (identified by passing -2 as the start offset).
- The load ends when it reaches the message with offset 10000.
- The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.
- The brokers are listening on port 6667. (This is port number that the Hortonworks Hadoop distribution assigns to its Kafka brokers.)
- The load ends if it reaches the end of the stream before reaching the message with offset 10000. If you do not supply this option, the connector waits until Kafka sends a message with offset 10000.
- The loaded data is sent to the KafkaJSONParser for processing.
=> COPY web_test SOURCE KafkaSource(stream='web_hits|0|-2|10000', brokers='kafka01.example.com:6667,kafka03.example.com:6667', stop_on_eof=true) PARSER KafkaJSONParser(); Rows Loaded ------------- 1068 (1 row)
KafkaOffsets
The KafkaOffsets user-defined transform function returns load operation statistics generated by the most recent invocation of KafkaSource. Query KafkaOffsets to see the metadata produced by your most recent load operation. You can query KafkaOffsets after each KafkaSource invocation to view information about that load. If you are using the scheduler, you can also view historical load information in the stream_microbatch_history table.
For each load operation, KafkaOffsets returns the following:
- source kafka topic
- source kafka partition
- starting offset
- ending offset
- number of messages loaded
- number of bytes read
- duration of the load operation
- end message
- end reason
The following example demonstrates calling KafkaOffsets to show partition information on the table named web_test that was loaded using KafkaSource.
=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over() FROM web_test) AS stats ORDER BY kpartition; kpartition | start_offset | end_offset | msg_count | ending ------------+--------------+------------+-----------+------------ 0 | -2 | 9999 | 1068 | END_OFFSET
The output shows that KafkaSource loaded 1068 messages (rows) from Kafka in a single partition. The KafkaSource ended the data load because it reached the ending offset.
Note: The values shown in the start_offset column are exclusive (the message with the shown offset was not loaded) and the values in the end_offset column are inclusive (the message with the shown offset was loaded). This is the opposite of the values specified in the KafkaSource's stream
parameter. The difference between the inclusiveness of KafkaSource's and KafkaOffset's start and end offsets are based on the needs of the job scheduler. KafkaOffset is primarily intended for the job scheduler's use, so the start and end offset values are defined so the scheduler can easily start streaming from where left off.
KafkaParser
The KafkaParser does not take any parameters. The parser loads the data bytes into a regular Vertica table directly from Kafka. You can use this parser as a catch-all for unsupported formats.
KafkaJSONParser
The KafkaJSONParser parses JSON-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.
The syntax for calling the parser is:
KafkaJSONParser( [flatten_maps=Boolean] [, flatten_arrays=Boolean] [, start_point=Boolean] [, omit_empty_keys=Boolean] [, reject_on_duplicate=Boolean] [, reject_on_materialized_type_error=Boolean] [, reject_on_empty_key=Boolean])
Parameter | Description |
---|---|
flatten_maps
|
Flattens all JSON maps if set to TRUE |
flatten_arrays
|
Flattens JSON arrays if set to TRUE |
start_point
|
Specifies the name of a key in the JSON load data at which to begin parsing. The parser ignores all data before the start_point value.The parser processes data after the first instance, and up to the second, ignoring any remaining data. |
omit_empty_keys
|
If set to TRUE, omits any key from the load data that does not have a value set. |
reject_on_duplicate
|
If set to TRUE, rejects data that contains duplicate key names. |
reject_on_materialized_type_error
|
When set to TRUE, rejects the data row if the data includes keys matching an existing materialized column and has a key that cannot be mapped into the materialized column's data type. |
reject_on_empty_key
|
If set to TRUE, rejects any row containing a key without a value. |
See Loading JSON Data for more information.
The following example demonstrates loading JSON data from Kafka. The parameters in the statement define to the load to:
- Load data into the pre-existing table named logs.
- The KafkaSource streams the data from a single partition in the source called server_log.
- The Kafka broker for the data load is running on the host named kafka01 on port 9092.
- KafkaSource stops loading data after either 10 seconds or on reaching the end of the stream, whichever happens first.
- The KafkJSONParser flattens any arrays or maps in the JSON data.
=> COPY logs SOURCE KafkaSource(stream='server_log|0|0', stop_on_eof=true, duration=interval '10 seconds', brokers='kafka01:9092') PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);
KafkaAVROParser
The KafkaAVROParser parses AVRO-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.
KafkaAVROParser( [reject_on_materialized_type_error=Boolean] [, flatten_maps=Boolean] [, flatten_arrays=Boolean] [, flatten_records=Boolean] [, external_schema=JSON_string] [, codec='default'|'snappy'|'null'] [, with_metadata=Boolean] [, schema-registry-url='url'] [, schema_registry_subject='subject_name'] [, schema_registry_version='version_number'])
Parameter | Description |
---|---|
reject_on_materialized_type_error
|
When set to TRUE, rejects the data row if it contains a materialized column value that cannot be mapped into the materialized column's data type. |
flatten_maps
|
If set to TRUE, flattens all Avro maps. |
flatten_arrays
|
If set to TRUE, flattens Avro arrays. |
flatten_records
|
If set to TRUE, flattens all Avro records. |
external_schema
|
Specifies the schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter. |
codec
|
Specifies the codec in which the Avro file was written. Valid values are:
|
with_metadata
|
If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE. |
schema_registry_url
|
Specifies the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Using a Schema Registry with Kafka. |
schema_registry_subject
|
In the schema registry, the subject of the schema to use for data loading. |
schema_registry_version
|
In the schema registry, the version of the schema to use for data loading. |
See Loading Avro Data for more information.
The following example demonstrates loading data from Kafka in an Avro format. The statement:
- Loads data into an existing flex table named weather_logs.
- Copies data from the default Kafka broker (running on the local system on port 9092).
- The source is named temperature.
- The source has a single partition.
- The load starts from offset 0.
- The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.
- The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.
- The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.
- Rejected rows of data are saved to a table named t_rejects1.
=> COPY weather_logs SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true, duration=interval '10 seconds') PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False, external_schema=E'{"type":"record","name":"Weather","fields":' '[{"name":"station","type":"string"},' '{"name":"time","type":"long"},' '{"name":"temp","type":"int"}]}') REJECTED DATA AS TABLE "t_rejects1";