Share this article:

Vertica Integration with Apache Flink: Connection Guide

About Vertica Connection Guides

Vertica connection guides provide basic instructions for connecting a third-party partner product to Vertica. Connection guides are based on our testing with specific versions of Vertica and the partner product.

Vertica and Apache Flink: Latest Versions Tested

This document is based on our testing using the following versions:

Software Version
Apache Flink

1.13

JDK 1.8
Scala 2.11.0
IntelliJ IDEA IntelliJ Community Edition 2021.1.1
Desktop Platform Windows Server 2016
Vertica Client

Vertica JDBC Driver 10.1

Vertica Server

Vertica 10.1

Apache Flink Overview

Apache Flink is an open source framework and distributed processing engine developed by Apache Software Foundation. It is used for stream processing and batch processing. This tool is developed using Java and Scala. You can use Java, Scala, or Python to program in Apache Flink.

Downloading the Vertica Client Driver

Apache Flink uses the Vertica JDBC driver to connect to Vertica. To download the client driver

  1. Navigate to the Vertica Client Drivers page.
  2. Download the JDBC driver package for your version of Vertica.

    Note For more information about client and server compatibility, see Client Driver and Server Version Compatibility in the Vertica documentation.

Connecting Vertica to Apache Flink

To read data from Vertica as the source and write to Vertica as the target with Apache Flink,

  1. Open IntelliJ IDEA and create a new Scala project with sbt as the build type.

  2. After the project is created, add the following SBT dependencies in the build.sbt file that is located in your project (Project > build.sbt). Wait for the dependencies to load into your environment.

    libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.13.0"
    libraryDependencies += "org.apache.flink" % "flink-core" % "1.13.0"
    libraryDependencies += "org.apache.flink" %% "flink-clients" % "1.13.0"
    libraryDependencies += "org.apache.flink" %% "flink-jdbc" % "1.10.3" % "compile"
    libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.13.0" % "compile"
  3. Add the Vertica JDBC Driver jar file in your project.

    1. From the menu, click File, and then click Project Structure.

    2. Click the plus sign below Module SDK, and then click JARs or Directories.

    3. Select the Vertica JDBC driver jar file that you downloaded from the Vertica website and click OK.

    4. In the Project Structure window, click OK.

    5. The Vertica JDBC driver is displayed in the left pane, Project > External Libraries.

  4. Create a Scala class file.

    By default, Class is selected. Select Object and enter the class name.

  5. Import the following packages in your project:

    import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
    import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
    import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
    import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
    import org.apache.flink.api.java.typeutils.{RowTypeInfo}
    import org.apache.flink.api.scala.ExecutionEnvironment
    import org.apache.flink.types.Row
    import org.apache.flink.api.scala._
  6. Create a connection with Vertica as the source database and map the data types:

    val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.vertica.jdbc.Driver")
    .setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
    .setQuery("Query for reading the source table")
    .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
    .finish())
  7. Create a temporary view and create a data set using the view:

    tableEnv.createTemporaryView("<Temp View Name>", dataSource)
    val query = tableEnv.sqlQuery("<Query on source view>")
    dataSource.print()
    val result = tableEnv.toDataSet[Row](query)
  8. Create a connection with Vertica as the target database and map the data types:

    result.output(JDBCOutputFormat.buildJDBCOutputFormat()
    .setDrivername("com.vertica.jdbc.Driver")
    .setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
    .setQuery("insert into <SchemaName>.<TableName> values (?,?,?)") //Parameters passed to this query should match to the number column in source table.
    .setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
    .finish())
  9. Execute the program.

  10. Verify the data in the Vertica database.

    1. Source database

    2. Target database

Sample Code

Following is a sample code to connect to Vertica as a source and target with Flink. In the following example, we are reading the data from Vertica and writing to Vertica:

package org.vertica.flink
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.{RowTypeInfo}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
object FlinkWithVerticaDemo {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
//Create Source connection and map row datatype
val dataSource = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<IP_Address>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("Query for reading the source table")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
.finish())
tableEnv.createTemporaryView("<Temp View Name>", dataSource)
val query = tableEnv.sqlQuery("<Query on source view>")
dataSource.print()
val result = tableEnv.toDataSet[Row](query)
result.output(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl("jdbc:vertica://<Target_DB_IP>:5433/<DB Name>?user=<username>&password=<password>")
.setQuery("insert into <SchemaName>.<TableName> values (?,?,?)") //Parameters passed to this query should match to the number column in source table.
.setSqlTypes(Array(java.sql.Types.VARCHAR, java.sql.Types.VARCHAR, java.sql.Types.VARCHAR))
.finish())
env.execute("flink-test")
}
}

Known Limitations

Read Mode

  • For TIME, TIMETZ, and TIMESTAMPTZ data types, milliseconds are truncated.
  • For BINARY, VARBINARY, and LONG VARBINARY data types, the values are not displayed correctly when previewing the data.

Write Mode

  • INTERVAL and UUID data types are not supported. To use these data types you must type cast the data types to VARCHAR.

  • For TIME data type, milliseconds are rounded off to 3 digits.
  • For TIMETZ data type, Null and milliseconds are not supported.
  • For TIMESTAMPTZ, milliseconds are not supported.

For More Information

Share this article: