Using COPY with Data Streaming

You can manually stream data from Kafka into Vertica using a COPY statement. This streaming happens only for a limited time; when the COPY statement finishes, no additional messages are streamed from Kafka to Vertica. Manually copying data this way is useful when you have a specific set of messages in Kafka that you want to analyze. It is also useful when you want to explore the data in a Kafka stream before setting up a scheduler to continuously stream the data into Vertica.

The source of your COPY statement is always KafkaSource. Your COPY statement usually uses one of three parsers: KafkaParser, KafkaJSONParser, or KafkaAvroParser.

For example:

=> COPY public.from_kafka 
   SOURCE KafkaSource(stream='iot_data|0|-2,iot_data|1|-2',
                      brokers='kafka01.example.com:9092',
                      duration=interval '10000 milliseconds',
                      executionparallelism='1',				
                      stop_on_eof=true) 
   PARSER  KafkaAVROParser(schema_registry_url='http://localhost:8081/subjects/iot_data-value/versions/1')
   REJECTED DATA AS TABLE public.rejections 
   DIRECT NO COMMIT;

In the previous example:

Note: If you are copying data containing default values into a flex table, you must identify the default value column as __raw__. For more information, refer to "Handling Default Values During Loading" in Bulk Loading Data into Flex Tables.

The following sections explain the parameters you can pass to the KafkaSource and its associated parsers.

KafkaSource

The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero length messages.

The syntax for calling KafkaSource is:

KafkaSource(
stream='topic_name|partition|start_offset[|end_offset][,...]' [, brokers='host:port[,...]'] [, duration=interval] [, executionparallelism='value'] [, stop_on_eof=Boolean] [, eof_timeout=timeout])

 

Parameter Description
stream

Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :

  • topic_name: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information.
  • partition: the partition in the Kafka topic to copy.
  • start_offset: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offset start_offset is loaded).
  • end_offset: the optional offset where the load should end. This offset is exclusive (the message with the offset end_offset will not be loaded).
    To end a load using end_offset, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not and not set offset values for others results in an error.
    If you do not specify an ending offset, you must supply at least one other ending condition using stop_on_eof or duration.
brokers

A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.

Default Value: localhost:9092

duration

An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.

executionparallelism

A integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.

If you do not specify this parameter, Vertica automatically creates a thread for every partition.

If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler resource pool.

stop_on_eof

Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using duration or by supplying ending offsets instead.

Default Value: FALSE

eof_timeout A timeslice of type INTERVAL. If a COPY command does not receive any Kafka messages within the eof_timeout interval, Vertica responds by ending that COPY statement. This parameter applies only if stop_on_eof is TRUE.

Loading from Multiple Topics in the Same Stream Parameter

You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:

Duration Note

The duration parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration.

Example

The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_test with the following options:

=> COPY web_test 
   SOURCE KafkaSource(stream='web_hits|0|-2|10000',
                      brokers='kafka01.example.com:6667,kafka03.example.com:6667', 
                      stop_on_eof=true) 
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1068
(1 row)

KafkaOffsets

The KafkaOffsets user-defined transform function returns load operation statistics generated by the most recent invocation of KafkaSource. Query KafkaOffsets to see the metadata produced by your most recent load operation. You can query KafkaOffsets after each KafkaSource invocation to view information about that load. If you are using the scheduler, you can also view historical load information in the stream_microbatch_history table.

For each load operation, KafkaOffsets returns the following:

The following example demonstrates calling KafkaOffsets to show partition information on the table named web_test that was loaded using KafkaSource.

=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()
   FROM web_test) AS stats ORDER BY kpartition;
 kpartition | start_offset | end_offset | msg_count |   ending
------------+--------------+------------+-----------+------------
          0 |           -2 |       9999 |      1068 | END_OFFSET

The output shows that KafkaSource loaded 1068 messages (rows) from Kafka in a single partition. The KafkaSource ended the data load because it reached the ending offset.

Note: The values shown in the start_offset column are exclusive (the message with the shown offset was not loaded) and the values in the end_offset column are inclusive (the message with the shown offset was loaded). This is the opposite of the values specified in the KafkaSource's stream parameter. The difference between the inclusiveness of KafkaSource's and KafkaOffset's start and end offsets are based on the needs of the job scheduler. KafkaOffset is primarily intended for the job scheduler's use, so the start and end offset values are defined so the scheduler can easily start streaming from where left off.

KafkaParser

The KafkaParser does not take any parameters. The parser loads the data bytes into a regular Vertica table directly from Kafka. You can use this parser as a catch-all for unsupported formats.

KafkaJSONParser

The KafkaJSONParser parses JSON-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

The syntax for calling the parser is:

KafkaJSONParser(
        [flatten_maps=Boolean]
        [, flatten_arrays=Boolean]
        [, start_point=Boolean]
        [, omit_empty_keys=Boolean]
        [, reject_on_duplicate=Boolean]
        [, reject_on_materialized_type_error=Boolean]
        [, reject_on_empty_key=Boolean])
Parameter Description
flatten_maps Flattens all JSON maps if set to TRUE
flatten_arrays Flattens JSON arrays if set to TRUE
start_point

Specifies the name of a key in the JSON load data at which to begin parsing. The parser ignores all data before the start_point value.The parser processes data after the first instance, and up to the second, ignoring any remaining data.

omit_empty_keys If set to TRUE, omits any key from the load data that does not have a value set.
reject_on_duplicate If set to TRUE, rejects data that contains duplicate key names.
reject_on_materialized_type_error When set to TRUE, rejects the data row if the data includes keys matching an existing materialized column and has a key that cannot be mapped into the materialized column's data type.
reject_on_empty_key If set to TRUE, rejects any row containing a key without a value.

See Loading JSON Data for more information.

The following example demonstrates loading JSON data from Kafka. The parameters in the statement define to the load to:

=> COPY logs SOURCE KafkaSource(stream='server_log|0|0', 
                                stop_on_eof=true, 
                                duration=interval '10 seconds',
                                brokers='kafka01:9092')
   PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);

KafkaAVROParser

The KafkaAVROParser parses AVRO-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

KafkaAVROParser(
                [reject_on_materialized_type_error=Boolean]
                [, flatten_maps=Boolean]
                [, flatten_arrays=Boolean]
                [, flatten_records=Boolean]
                [, external_schema=JSON_string]
                [, codec='default'|'snappy'|'null']
                [, with_metadata=Boolean]
                [, schema-registry-url='url']
                [, schema_registry_subject='subject_name']
                [, schema_registry_version='version_number'])
Parameter Description
reject_on_materialized_type_error When set to TRUE, rejects the data row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps If set to TRUE, flattens all Avro maps.
flatten_arrays If set to TRUE, flattens Avro arrays.
flatten_records If set to TRUE, flattens all Avro records.
external_schema Specifies the schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec

Specifies the codec in which the Avro file was written. Valid values are:

  • 'default' - Avro´s default
  • 'snappy' - snappy compression
  • 'null' - data is not compressed and codec is not needed
with_metadata If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url Specifies the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Using a Schema Registry with Kafka.
schema_registry_subject In the schema registry, the subject of the schema to use for data loading.
schema_registry_version In the schema registry, the version of the schema to use for data loading.

See Loading Avro Data for more information.

The following example demonstrates loading data from Kafka in an Avro format. The statement:

=> COPY weather_logs 
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true, 
                      duration=interval '10 seconds') 
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}') 
   REJECTED DATA AS TABLE "t_rejects1";