Setting up a Scheduler

You set up a scheduler using the Linux command line. Usually you perform the configuration on the host where you want your scheduler to run. It can be one of your Vertica hosts, or a separate host where you have installed the vkconfig utility (see The vkconfig Script for more information).

Follow these steps to set up and start a scheduler to stream data from Kafka to Vertica:

  1. Create a Config File (Optional)
  2. Add the vkconfig Directory to Your Path (Optional)
  3. Create a Resource Pool for Your Scheduler
  4. Create the Scheduler
  5. Create a Cluster
  6. Create a Data Table
  7. Create a Source
  8. Create a Target
  9. Create a Load-Spec
  10. Create a Microbatch
  11. Launch the Scheduler

These steps are explained in the following sections. These sections will use the example of loading web log data (hits on a web site) from Kafka into a Vertica table.

Create a Config File (Optional)

Many of the arguments you supply to the vkconfig script while creating a scheduler do not change. For example, you often need to pass a username and password to Vertica to authorize the changes to be made in the database. Adding the username and password to each call to vkconfig is tedious and error-prone.

Instead, you can pass the vkconfig utility a configuration file using the --conf option that specifies these arguments for you. It can save you a lot of typing and frustration.

The config file is a text file with a keyword=value pair on each line. Each keyword is a vkconfig command-line option, such as the ones listed in Common vkconfig Script Options

The following example shows a config file named weblog.conf that will be used to define a scheduler named weblog_sched. This config file is used throughout the rest of this example.

# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched

Add the vkconfig Directory to Your Path (Optional)

The vkconfig script is located in the /opt/vertica/packages/kafka/bin directory. Typing this path for each call to vkconfig is tedious. You can add vkconfig to your search path for your current Linux session using the following command:

$ export PATH=/opt/vertica/packages/kafka/bin:$PATH

For the rest of your session, you are able to call vkconfig without specifying its entire path:

$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch, 
shutdown, help

If you want to make this setting permanent, add the export statement to your ~/.profile file. The rest of this example assumes that you have added this directory to your shell's search path.

Create a Resource Pool for Your Scheduler

Vertica recommends you always create a resource pool specifically for your scheduler. Schedulers assume they have exclusive use of the resource pool they are assigned. Using a separate pool for a scheduler lets you fine-tune its impact on your Vertica cluster's performance. You create resource pools within Vertica using the CREATE RESOURCE POOL statement.

=> CREATE RESOURCE POOL weblog_pool MEMORYSIZE '10%' PLANNEDCONCURRENCY 1 QUEUETIMEOUT 0;

One key setting in the resource pool for a scheduler is PLANNEDCONCURRENCY. This value tells Vertica how many simultaneous tasks (such as loads) will be using the resource pool. This value allows the scheduler to load from each topic simultaneously. This setting controls the number of simultaneous COPY statements the scheduler runs during each microbatch. Set this value to at least the number of topics the scheduler is reading data from. Setting it slightly higher than the number of topics is a good practice. The higher PLANNEDCONCURRENCY allocates resources for the scheduler's own internal processes, preventing them from impacting the data load.

Another important setting is the pool's EXECUTIONPARALLELISM which controls the number of threads that Vertica uses to load data from the topic. Ideally, you set this value to reflect the number of partitions within the topic. Often, a topic has too many partitions to assign a thread to each one. In these cases, you should set the EXECUTIONPARALLELISM to a value so the threads have an equal number of partitions to read from. For example, suppose the topic your scheduler is reading from has 100 partitions. Then you could set EXECUTIONPARALLELISM to 10, so that each thread will be reading from 10 partitions.

Finally, set the QUEUETIMEOUT parameter to 0. A value of 0 allows data to load continuously. If the scheduler has to wait for resources, it cannot progress, compromising scheduling configurations. By exclusively allocating a resource pool for your scheduler, you do not have to worry about it oversubscribing and causing queuing.

Not allocating enough resources to your schedulers can result in errors. For example, you may get OVERSHOT DEADLINE FOR FRAME errors if the scheduler is not able to load data from all of the topics it is supposed to in a data frame.

If you do not create and assign a resource pool for your scheduler, it will use the default resource pool named kafka_default_pool.

See Resource Pool Architecture for more information about resource pools.

Create the Scheduler

Vertica includes a default scheduler named stream_config. You can use this scheduler or create a new scheduler using the vkconfig script's scheduler tool with the --create and --config-schema options:

$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file

The --create and --config-schema options are the only ones required to add a scheduler with default options. This command creates a new schema in Vertica that holds the scheduler's configuration. See What Happens When You Create a Scheduler for details on the creation of the scheduler's schema.

You can use additional configuration parameters to further customize your scheduler. See Scheduler Tool Options for more information.

The following example:

  • Creates a scheduler named weblog_sched using the --config-schema option.
  • Grants privileges to configure and run the scheduler to the Vertica user named kafka_user with the --operator option. The dbadmin user must specify additional privileges separately.
  • Specifies a frame duration of thirty seconds with the --frame-duration option.
  • Sets the resource pool that the scheduler uses to the weblog_pool created earlier:
$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
  --frame-duration '00:00:30' --resource-pool weblog_pool --conf weblog.conf

Technically, the previous example doesn't need to supply the --config-schema argument because it is set in the weblog.conf file. It appears in this example for clarity. There's no harm in supplying it on the command line as well as in the configuration file, as long as the values match. If they do not match, the value given on the command line takes priority.

Create a Cluster

You must associate at least one Kafka cluster with your scheduler. Schedulers can access more than one Kafka cluster. To create a cluster, you supply a name for the cluster and host names and ports the Kafka cluster's brokers.

When validating your settings, the scheduler connects to the cluster and automatically retrieves the list of all brokers in the cluster. Therefore, you do not have to list every single broker in the --hosts parameter.

The following example creates a cluster named kafka_weblog, with two Kafka broker hosts: kafka01 and kafka03 in the example.com domain. The Kafka brokers are running on port 9092.

$ vkconfig cluster --create --cluster kafka_weblog \
  --hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf

See Cluster Tool Options for more information.

Create a Source

Next, create at least one source for your scheduler to read. The source defines the Kafka topic the scheduler loads data from as well as the number of partitions the topic contains.

To create and associate a source with a configured scheduler, use the source tool. When you create a source, Vertica connects to the Kafka cluster to verify that the topic exists. So, before you create the source, make sure that the topic already exists in your Kafka cluster. Because Vertica verifies the existence of the topic, you must supply the previously-defined cluster name using the --cluster option.

The following example creates a source for the Kafka topic named web_hits on the cluster created in the previous step. This topic has a single partition.

$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf

The --partitions parameter is the number of partitions to load, not a list of individual partitions. For example, if you set this parameter to 3, the scheduler will load data from partitions 0, 1, 2, and 3.

See Source Tool Options for more information.

Create a Data Table

Before you can create a target for your scheduler, you must create a target table in your Vertica database. This is the table Vertica uses to store the data the scheduler loads from Kafka. You must decide which type of table to create for your target:

  • A standard Vertica database table, which you create using the CREATE TABLE statement. This type of table stores data efficiently. However, you must ensure that its columns match the data format of the messages in Kafka topic you are loading. You cannot load complex types of data into a standard Vertica table.
  • A flex table, which you create using CREATE FLEX TABLE. A flex table is less efficient than a standard Vertica database table. However, it is flexible enough to deal with data whose schema varies and changes. It also can load most complex data types that (such as maps and lists) that standard Vertica tables cannot.

Avoid having columns with primary key restrictions in your target table. The scheduler stops loading data if it encounters a row that has a value which violates this restriction. If you must have a primary key restricted column, try to filter out any redundant values for that column in the streamed data before is it loaded by the scheduler.

The data in this example is in a set format, so the best table to use is a standard Vertica table. The following example creates a table named web_hits to hold four columns of data. This table is located in the public schema.

=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));

You do not need to create a rejection table to store rejected messages. The scheduler creates the rejection table automatically.

Create a Target

Once you have created your target table, you can create your scheduler's target. The target tells your scheduler where to store the data it retrieves from Kafka. This table must exist when you create your target. You use the vkconfig script's target tool with the --target-schema and --target_table options to specify the Vertica target table's schema and name. The following example adds a target for the table created in the previous step.

$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf

See Target Tool Options for more information.

Create a Load Spec

The scheduler's load spec provides parameters that Vertica uses when parsing the data loaded from Kafka. The most important option is --parser which sets the parser that Vertica uses to parse the data. You have three parser options:

In this example, the data being loaded from Kafka is in JSON format. The following command creates a load spec named weblog_load and sets the parser to KafkaJSONParser.

$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf

See Load Spec Tool Options for more information.

Create a Microbatch

The microbatch combines all of the settings added to the scheduler so far to define the individual COPY statements that the scheduler uses to load data from Kafka.

The following example uses all of the settings created in the previous examples to create a microbatch called weblog.

$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
--add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \ --conf weblog.conf

See Microbatch Tool Options for more information.

Launch the Scheduler

Once you've created at least one microbatch, you can run your scheduler. You start your scheduler using the launch tool, passing it the name of the scheduler's schema. The scheduler begins scheduling microbatch loads for every enabled microbatch defined in its schema.

The following example launches the weblog scheduler defined in the previous steps.

$ vkconfig launch --conf weblog.conf &

Vertica does not recommend specifying a password on the command line. Passwords on the command line can be exposed by the system's list of processes, which shows the command line for each process. Instead, put the password in a configuration file. Make sure the configuration file's permissions only allow it to be read by the user.

See Launch Tool Options for more information.

Checking that the Scheduler is Running

Once you have launched your scheduler, you can verify that it is running by querying the stream_microbatch_history table in the scheduler's schema. This table lists the results of each microbatch the scheduler has run.

For example, this query lists the microbatch name, the start and end times of the microbatch, the start and end offset of the batch, and why the batch ended. The results are ordered to start from when the scheduler was launched:

=> SELECT microbatch, batch_start, batch_end, start_offset, 
          end_offset, end_reason 
          FROM weblog_sched.stream_microbatch_history 
          ORDER BY batch_start DESC LIMIT 10;
          
 microbatch |        batch_start         |         batch_end          | start_offset | end_offset |  end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
 weblog     | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 |           -2 |      34931 | END_OF_STREAM
 weblog     | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 |        34931 |      34955 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:19.25731  | 2017-10-04 09:31:22.203173 |        34955 |      35274 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 |        35274 |      35555 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:19.43153  | 2017-10-04 09:32:20.7519   |        35555 |      35852 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 |        35852 |      36142 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 |        36142 |      36444 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 |        36444 |      36734 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 |        36734 |      37036 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 |        37036 |      37327 | END_OF_STREAM
(10 rows)