Vertica Integration with PipelineDB: Connection Guide

Applies to Vertica 7.2.x and earlier 

About Vertica Connection Guides

Vertica connection guides provide basic information about setting up connections to Vertica from software that our technology partners create. These documents provide guidance using one specific version of Vertica and one specific version of the third party vendor’s software. Other versions of the third-party product may work with Vertica. However, other versions may not have been tested. This document provides guidance using the latest versions of Vertica and PipelineDB as of September, 2016.

About This Document

Vertica provides SQL and SQL on Hadoop capabilities. PipelineDB runs SQL queries continuously on streams.

This document explains how you can combine the SQL streaming capabilities of PipelineDB with the robust massively parallel processing (MPP) capabilities of Vertica.

The example in this document shows how you can execute SQL queries on streams using PipelineDB and then persist those streams into Vertica to perform advance analytics.

This document is based on the results of testing Vertica 7.2.x with PipelineDB version 0.9.5.

PipelineDB Overview

PipelineDB is a open source SQL database that executes SQL queries on data streams. PipelineDB incrementally stores the results of those queries in tables. You can choose to incrementally export the results of the database tables to Vertica and then perform complex analytics on the data in the tables.

PipelineDB and Kafka

Apache Kafka is an open source application designed for a streaming use case (high volumes of data with low latency). PipelineDB can produce and consume messages from Kafka and load them into data streams.

For more information, see https://www.pipelinedb.com/.

Install PipelineDB

To install PipelineDB:

  1. Create a Pipelinedb user to run the PipelineDB server:

    $ adduser pipelinedb
  2. Download the .rpm file for your operating system from https://www.pipelinedb.com/download.
  3. Install the rpm you downloaded. For example:

    $ rpm -ivh pipelinedb-0.9.3-1.x86_64.rpm

For more details on installing PipelineDB installation, see http://docs.pipelinedb.com/installation.html#rpm.

Initialize PipelineDB Data Directory

After you have installed PipelineDB, you need to initialize the database directory. This directory is where PipelineDB stores all the files and data associated with a database.

To initialize the database directory:

$ pipeline-init -D pipelinedb_datadir/

Start PipelineDB

To start the PipelineDB server to run in the background, use the pipeline-ctl driver and point it to your newly initialized database directory:

$ pipeline-ctl -D  pipelinedb_datadir -l pipelinedb.log start

The -l option specifies the path of a file to log messages to. You can also use the pipeline-ctl driver to stop a running PipelineDB server:

$ pipeline-ctl -D pipelinedb_datadir stop

Test whether the PipelineDB server is running:

$ psql -h localhost -p 5432 pipeline

Configure Kafka Producer and Consumer in PipelineDB

To configure PipelineDB support for Kafka, you need to set up librdkafka so that you can install the PipelineDB Kafka extension:

  1. Download the source code: https://github.com/edenhill/librdkafka.

  2. Install librdkafka:

    $ ./configure --disable-{ssl,sasl}
    $ make
    $ make install
    $ make clean
  3. After you have successfully installed librdkafka, install the pipeline_kafka extension. Download the source code: https://github.com/pipelinedb/pipeline_kafka.
  4. Install the pipeline_kafka extension:

    $ ./configure
    $ make
    $ make install
    $ make clean
  5. Add the following line to your pipelinedb.conf file. The pipelinedb.conf file was installed with PipelineDB:

    Shared_preload_libraries = pipeline_kafka
  6. Load the extension into PipelineDB:

    => CREATE EXTENSION pipeline_kafka;
    CREATE EXTENSION

After you create the extension, PipelineDB can start producing and consuming messages from Kafka. PipelineDB can also send messages to Vertica.

For more details, see Consuming Messages in the PipelineDB documentation.

Configure Kafka

To configure Kafka, download the .tgz file from http://kafka.apache.org/downloads.html and install it on your Linux box.

Note For this document, Vertica engineers used kafka.9.0.1.tgz.

Before Kafka can start streaming data to PipelineDB, you need to start the Kafka and ZooKeeper server and create the topics you want PipelineDB to monitor using Kafka.

For more information, see the Kafka documentation.

Event Flow from Kafka to PipelineDB to Vertica: An Example

Let's see an example of how events can flow from Kafka to PipelineDB for stream processing and then to Vertica for analytical queries or model building on historical data.

You can design the system so that events are consumed separately by PipelineDB and by Vertica for their respective analysis.

PipelineDB has written a blog about SQL on Kafka. Vertica engineers have extended that example to show the integration between PipelineDB and Vertica.

To run this example, you need to install:

  • nginx—An HTTP interface that you can use to log URL request metadata into Kafka.
  • kafkacat—Writes new records from files into Kafka
  • siege—A tool that can concurrently make HTTP requests to random URLs

Configure the Producer

Configure the Kafka producer for Vertica:

  1. Start the Kafka server:

    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    $ bin/kafka-server-start.sh config/server.properties
  2. The Kafka producer writes JSON-serialized nginx logs to Kafka. This example uses a minimal configuration file for the nginx server:

    cat <<EOF > nginx.conf
    worker_processes 4;
    pid $PWD/nginx.pid;
    events {}
    http {
    log_format json
     '{'
       '"ts": "\$time_iso8601", '
       '"user_agent": "\$http_user_agent", '
       '"url": "\$request_uri", '
       '"latency": "\$request_time",  '
       '"user": "\$arg_user"'
     '}';access_log $PWD/access.log json;
    error_log $PWD/error.log;
    server { location ~ ^/ { return 200; } } }
    EOF

    This file configures an HTTP server. It returns 200 responses for any URL request that it receives. It also logs a JSON record to a file named access.log that contains a small amount of information about the request.

  3. Start serving requests:

    $ nginx -c $PWD/nginx.conf

    To get a better idea of our message format, here's the tail command output of access.log for a few URL requests:

    {"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", 
    "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", 
    "url": "/page38/path7?user=24746", "latency": "0.001",  "user": "24746"}
    {"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", 
    "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", 
    "url": "/page66/path7?user=8846", "latency": "0.001",  "user": "8846"}
    {"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", 
    "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", 
    "url": "/page33/path3?user=6006", "latency": "0.001",  "user": "6006"}
    {"ts": "2015-09-14T10:30:21-07:00", "status": "200", "request_method": "GET", 
    "user_agent": "Mozilla/5.0 (pc-x86_64-linux-gnu) Siege/3.0.5", 
    "url": "/page85/path2?user=28043", "latency": "0.000",  "user": "28043"}
  4. To tail access.log and transform any new lines to Kafka messages, use kafkacat:

    $ tail -f access.log | kafkacat -b localhost:9092 -t logs_topic
  5. Use the siege tool to simulate a large number of requests to nginx. The following example generates a list of URLs that siege randomly selects from. This yields a workload having 1,000 unique URLs, randomly visited by 100,000 unique users.

    for x in {0..1000000}; do echo "http://localhost/page$((RANDOM % 100))/path$
    ((RANDOM % 10))?user=$((RANDOM % 100000))" >> urls.txt; done
  6. Run siege:

    $ siege -c32 -b -d0 -f urls.tx

Redirect Events to Vertica

You can have these events flow to PipelineDB and then redirect them to Vertica. Alternatively, Vertica can directly consume events from the main stream. The following steps show how PipelineDB can split the stream so that it uses one split for its processing and can pass the other split to Vertica:

  1. Create the Kafka extension:

    Pipelinedb => CREATE EXTENSION pipeline_kafka;
  2. Set the Kafka broker:

    Pipelinedb => SELECT pipeline_kafka.add_broker('<kafkahost>:9092');
  3. Create a stream in PipelineDB that receives the events generated by the siege tool that Kafka is publishing:

    Pipelinedb => CREATE STREAM logs_stream (payload json);

Create a Continuous View

You cannot query the streams directly so you can create a continuous view to keep track of the events that are coming into the stream.

  1. Create the view:

    Pipelinedb => CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;
  2. Associate the stream with the topic that you want to consume, which in this case is logs_stream:

    Pipelinedb => SELECT pipeline_kafka.consume_begin('logs_topic', 'logs_stream');

Split the Stream

PipelineDB can split the incoming stream into two streams—one for its consumption and another to be published back to Kafka so that Vertica can consume it via its KafkaLoader.

Follow these steps:

  1. Create two streams:

    Pipelinedb => CREATE STREAM pipelinedb_stream (payload json);
    Pipelinedb => CREATE STREAM vertica_stream (payload json);
  2. Create continuous transformation that can split the stream:

    Pipelinedb => CREATE CONTINUOUS TRANSFORM t AS SELECT payload FROM logs_stream 
    THEN EXECUTE PROCEDURE pipeline_stream_insert('pipelinedb_stream','vertica_stream');
  3. Create views on the split streams to track the events on each stream:

    Pipelinedb => CREATE CONTINUOUS VIEW pipelinedb_message_count 
      AS SELECT COUNT(*) FROM pipelinedb_stream; 
    Pipelinedb => CREATE CONTINUOUS VIEW vertica_message_count 
      AS SELECT COUNT(*) FROM vertica_stream;
  4. Create a continuous transformation that publishes the events into Kafka for Vertica to consume :

    Pipelinedb => CREATE CONTINUOUS TRANSFORM vertica_publish AS SELECT payload 
    FROM vertica_stream; 
    THEN EXECUTE PROCEDURE pipeline_kafka.emit_tuple('kafkapush'); 

At this stage, the events that Vertica consumes are available in topic kafkapush.

Configure the Consumer in Vertica

This topic shows you how events can be streamed into PipelineDB and then published back into Vertica for historical analysis.

To make this happen, take the following steps to configure the Kafka loader in Vertica:

  1. Add a scheduler pointing to the Kafka broker from where topics will be consumed:

    $ /opt/vertica/packages/kafka/bin/vkconfig scheduler --add --password password 
    --brokers intcent67-121-9:9092
  2. Create a Vertica flex table in Vertica to store the events that PipelineDB publishes:

    vsql => CREATE FLEX TABLE public.kafka_tgt();
  3. Add a Kakfa topic to the Vertica scheduler:

    $ /opt/vertica/packages/kafka/bin/vkconfig topic --add --target public.kafka_tgt 
    --rejection-table public.kafka_rej --topic kafkapush --password password
  4. After the topic is added, launch the scheduler so that it can start consuming the messages from Kafka:

    $ /opt/vertica/packages/kafka/bin/vkconfig launch --password password
  5. To see the events loading into the flex table, query the flex table:

    vsql => SELECT * FROM public.kafka_tgt;
  6. Verify than an entry exists for the Kafka broker host name in the /etc/hosts file.

For more details on Vertica Kafka configuration, see Integrating with Apache Kafka in the Vertica documentation.

Configure Foreign Data Wrapper for Vertica in PipelineDB

Creating a foreign data wrapper in PipelineDB allows you to query Vertica tables from PipelineDB. Follow these steps:

  1. Download the source code for the pipeline_odbc extension: https://github.com/pipelinedb/pipeline_odbc
  2. Install pipeline_odbc:

    $ make
    $ make install
  3. Download and create the Vertica ODBC drivers and then create the ODBC DSN for the Vertica instance.
  4. Create the ODBC extension in PipelineDB:

    pipeline=# CREATE EXTENSION odbc_fdw;
    CREATE EXTENSION
  5. Point PipelineDB to the Vertica cluster using odbc_fdw:

    pipeline=# CREATE SERVER vertica FOREIGN DATA WRAPPER odbc_fdw OPTIONS (dsn 'Vertica');
    CREATE SERVER
  6. Map the Vertica table to a PipelineDB table via the Vertica foreign server:

    pipeline=# CREATE FOREIGN TABLE vertica_test (x integer, y integer, z integer) SERVER vertica \
    OPTIONS (database 'vertica', table 'vertica_test');
  7. Map the PipelineDB user to a Vertica user:

    pipeline=# CREATE USER MAPPING FOR <pipelinedb user> SERVER vertica \
    OPTIONS (username '<vertica user>', password '<vertica password>');

Now you can interact with your Vertica table using PipelineDB as if it were a native PipelineDB relation. You can query tables or join them with the streams using PipelineDB:

pipeline=# SELECT * FROM vertica_test;
 x | y | z
---+---+---
 0 | 0 | 0
(1 row)

For More Information

For More Information About… … See
PipelineDB https://github.com/pipelinedb/pipeline_odbc
Apache Kafka http://kafka.apache.org/
Vertica Community Edition https://vertica.com/community/
Vertica Documentation http://vertica.com/docs/latest/HTML/index.htm
Big Data and Analytics Community https://vertica.com/big-data-analytics-community-content