Using a Schema Registry with Kafka

Vertica supports the use of a Confluent schema registry for Avro schemas with the KafkaAvroParser. By using a schema registry, you enable the Avro parser to parse and decode messages written by the registry and to retrieve schemas stored in the registry. In addition, a schema registry enables Vertica to process streamed data without sending a copy of the schema with each record. Vertica can access a schema registry in the following ways:

  • schema ID
  • subject and version

If you use the compatibility config resource in your schema registry, you should specify a value of at least BACKWARD. You may also choose to use a stricter compatibility setting. For more information on installing and configuring a schema registry, refer to the Confluent documentation.

Schema ID Loading

In schema ID based loading, the Avro parser checks the schema ID associated with each message to identify the correct schema to use. A single COPY statement can reference multiple schemas. Because each message is not validated, Vertica recommends that you use a flex table as the target table for schema ID based loading.

The following example shows a COPY statement that refers to a schema registry located on the same host.

=> COPY logs source kafkasource(stream='simple|0|0', stop_on_eof=true, 
duration=interval '10 seconds') parser 
KafkaAvroParser(schema_registry_url='http://localhost:8081/’);

Subject and Version Loading

In subject and version loading, you specify a subject and version in addition to the schema registry URL. The addition of the subject and version identifies a single schema to use for all messages in the COPY. If any message in the statement is incompatible with the schema, the COPY fails. Because all messages are validated prior to loading, Vertica recommends that you use a standard Vertica table as the target for subject and version loading.

The following example shows a COPY statement that identifies a schema subject and schema version as well as a schema registry.

=> COPY t source kafkasource(stream='simpleEvolution|0|0', 
stop_on_eof=true, duration=interval '10 seconds') parser 
KafkaAvroParser(schema_registry_url='http://repository:8081/schema-repo/', 
schema_registry_subject='simpleEvolution-value',schema_registry_version='1') 
REJECTED DATA AS TABLE "t_rejects";