Manually Copying Data From Kafka

You can manually stream data from Kafka into Vertica using a COPY statement. This technique is similar to copying data from other sources, such as the local file system, a client system, or from Apache Hadoop. See Getting Data into Vertica and the COPY statement reference for more information about using the COPY statement.

Manually copying data from Kafka is useful when you:

  • Want to load data on a specific schedule. For example, suppose the data you are loading is sent out at a specific time as a daily report. Then you do not need the overhead of setting up a scheduler that will run constantly. Instead, schedule a statement to run each day to load the data into Vertica.
  • Have a specific set of messages that you want to analyze. You can choose to load a subset of the data in a Kafka stream.
  • Want to explore the data in a Kafka stream before setting up a scheduler to continuously stream the data into Vertica.
  • Want greater control over the data load than using a scheduler. For example, suppose you want to perform business logic or custom rejection handling during the data load from Kafka. The scheduler does not support performing additional processing during its transactions. Instead, you can choose to periodically run a transaction that executes a COPY statement to load data from Kafka and then perform additional processing.

Unlike other copy methods, copying from Kafka often implies your COPY statement loads data for a set period of time, rather than loading all of the data from a file or other source. For example, you can choose to COPY all of the messages sent to a Kafka topic for one minute. Vertica copies just the data streamed during that one minute period. After the duration is up, the COPY statement ends and Vertica does not load any further data.

The durations you set for your data load is not exact. The duration actually controls how long the KafkaSource process runs. The actual COPY statement may run for a shorter period of time.

If you start a long-duration COPY statement from Kafka and need to stop it, you can call one of the functions that closes its session, such as CLOSE_ALL_SESSIONS.

You can also choose specific ranges of messages to load from a topic using offsets. Kafka stores a backlog of messages for topics. How long it stores these messages is configured by the Kafka administrator. Copying using offsets lets you load this previously sent data. You can even choose to load all of the messages that Kafka has stored for a topic and also load the messages sent over a period of time.

When copying data from Kafka, the source of your COPY statement is always KafkaSource. Your COPY statement usually uses one of three parsers: KafkaParser, KafkaJSONParser, or KafkaAvroParser. The other parsers (such as the Flex FCSVPARSER) are not directly compatible with the output of KafkaSource. You must alter KafkaSource's output using filters before other parsers can process the data. See Parsing Custom Formats for more information.

This example demonstrates copying data from Kafka into a Vertica table:

=> COPY public.from_kafka 
   SOURCE KafkaSource(stream='iot_data|0|-2,iot_data|1|-2',
                      brokers='kafka01.example.com:9092',
                      duration=interval '10000 milliseconds') 
   PARSER  KafkaAvroParser(schema_registry_url='http://localhost:8081/subjects/iot_data-value/versions/1')
   REJECTED DATA AS TABLE public.rejections 
   NO COMMIT;

In the previous example:

  • The data is copied into a table named from_kafka in the public schema.
  • The KafkaSource streams data from partitions 0 and 1 of the topic named iot_data.
  • The streaming starts from the earliest available message in the stream set by the -2 in the stream offset parameter. This is a special offset value indicating that the load should start from the earliest possible message.
  • KafkaSource reads the data from the Kafka broker running on the host named kafka01.example.com on port 9092.
  • COPY loads data for 10000 milliseconds (10 seconds). If it loads all of the existing data in the topic in this time, it waits for any streaming data to arrive until the 10 seconds has elapsed.
  • The stream is parsed as Avro data.
  • The schema that the Avro parser uses to parse the data is retrieved from a schema registry running on the local system.
  • Rejected data is saved in a table named public.rejections.

For more information, see the KafkaSource reference topic.

If you are copying data containing default values into a flex table, you must identify the default value column as __raw__. For more information, see Bulk Loading Data into Flex Tables.