KafkaAvroParser

The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

KafkaAvroParser(
                [enforce_length=Boolean]
                [, reject_on_materialized_type_error=Boolean]
                [, flatten_maps=Boolean]
                [, flatten_arrays=Boolean]
                [, flatten_records=Boolean]
                [, external_schema=JSON_string]
                [, codec='default'|'snappy'|'null']
                [, with_metadata=Boolean]
                [, schema-registry-url='url']
                [, schema_registry_subject='subject_name']
                [, schema_registry_version='version_number']
                [, key_separator='separator'])
Parameter Description
enforce_length When set to TRUE, rejects the row if any value is too wide to fit into its column. When using the default setting (FALSE) , the parser truncates any value that is too wide to fit within the column's maximum width.
reject_on_materialized_type_error When set to TRUE, rejects the row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps If set to TRUE, flattens all Avro maps.
flatten_arrays If set to TRUE, flattens Avro arrays.
flatten_records If set to TRUE, flattens all Avro records.
external_schema Specifies the schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec

Specifies the codec in which the Avro file was written. Valid values are:

  • 'default' - Avro´s default
  • 'snappy' - snappy compression
  • 'null' - data is not compressed and codec is not needed

This option is mainly provided for backwards compatibility. You usually have Kafka compress data at the message level, and have KafkaSource decompress the message for you.

with_metadata If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url Specifies the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Using a Schema Registry with Kafka.
schema_registry_subject In the schema registry, the subject of the schema to use for data loading.
schema_registry_version In the schema registry, the version of the schema to use for data loading.
key_separator Sets the character to use as the separator between keys.

See Loading Avro Data for more information.

The following example demonstrates loading data from Kafka in an Avro format. The statement:

  • Loads data into an existing flex table named weather_logs.
  • Copies data from the default Kafka broker (running on the local system on port 9092).
  • The source is named temperature.
  • The source has a single partition.
  • The load starts from offset 0.
  • The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.
  • The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.
  • The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.
  • Rejected rows of data are saved to a table named t_rejects1.
=> COPY weather_logs 
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true, 
                      duration=interval '10 seconds') 
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}') 
   REJECTED DATA AS TABLE "t_rejects1";