Using the Vertica Data Source API with the Connector V1

The Vertica Spark connector source API supports both parallel write and read operations. The following code sample illustrates how you can create an in-memory DataFrame by invoking SQLContext.read function, using Vertica’s com.vertica.spark.datasource.DefaultSource formatter.

V1 Spark Connector Load Options

To connect to Vertica, you must specify the following parameters for the options in sqlContext.read.format("com.vertica.spark.datasource.DefaultSource").options(...).

Parameter Description
table The name of the target Vertica table or view to save your Spark DataFrame. Note that the Vertica table must be segmented by hash or by an expression that returns non-negative integer values. Also, if the Vertica table is an external table, the underlying file(s) on which the external table is based must be accessible on all Vertica nodes.
db The name of the Vertica Database
user The name of the Vertica user. This user must have CREATE and INSERT privileges in the Vertica schema. The schema defaults to “public”, but may be changed using the dbschema optional parameter.
password The password for the Vertica user.
host The hostname of a Vertica node. This value is used to make the initial connection to Vertica and look up all the other Vertica node IPs. You can provide a specific IP address or resolvable name such as myhostname.com.
numPartitions Optional. The number of Spark partitions for the load from the Vertica job, each partition creates a JDBC connection. Default Value: 16.
dbschema

Optional. The schema space for the Vertica table. Default value: public.

port Optional. The Vertica Port. Default value: 5433

Example: Load Data into a DataFrame

The following example demonstrates reading the content of a Vertica table named test into a DataFrame.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.SparkConf

// Note: the following is deprecated in Spark 2.0. It will warn you to use
// SparkSession instead.
val sqlContext = new SQLContext(sc)

val table = "test"
val db = "myDB"
val user = "myUser"
val password = "myPassword"
val host = "myVerticaHost"
val part = "12";

val opt = Map("host" -> host, "table" -> table, "db" -> db, "numPartitions" -> part, "user" -> user, "password" -> password)

val df = sqlContext.read.format("com.vertica.spark.datasource.DefaultSource").options(opt).load()

Column Selection and Filter Push Down

Column selections and row filters applied on the DataFrame are pushed down into Vertica. Refer to the Spark SQL API for more details about applying column selections and row filters. The following example illustrates how you can load a column containing filtered rows as a DataFrame:

val c = df.select("a").filter("a > 5").count

When filtering it is important to specify the correct type. For example, single-quoted values are treated as strings, so if you wrote the above filter code as filter("a > '5'") then the filter is not pushed down, because the target column is an integer and the value '5' when single-quoted is treated as a string.

Additionally, when using other types such as dates, cast the value as a date type, such as:

filter("c1 >= cast('2010-1-2' as date) rather than filter("c1 >= '2010-1-2'). The latter example does not push down.

Additional examples of valid filters:

  • df.filter("id<=3").groupBy("id").sum("id").show
  • df.filter($"c".like("str%")).show // has to be varchar type to be pushed down
  • df.filter($"c".rlike("str")).show // has to be varchar type to be pushed down
  • df.filter($"id".isin(3,5)).show

Considerations When Using the Spark DataFrame Filter Method with Vertica

Be aware of the following considerations when using the DataFrame's filter method when loading data from Vertica:

  • Vertica supports the case insensitive terms “inf” and “infinity” to refer to INFINITY in SQL queries. However, when filtering using Spark’s filter method you must instead use the term (case sensitive) “Infinity”.
  • When filtering on Boolean values, do not use "is true/false" or "is not true/false" instead, cast the value as a Boolean, or use the equals operator, for example:
    • df.filter("c = 1").show
    • df.filter("c = True").show
    • df.filter("c = False").show

Example: Creating a DataFrame Using the Vertica Data Source

Use the following example to learn how to create a Spark DataFrame from a Vertica table:

  1. Create a sample table in Vertica:

    => CREATE TABLE test (a int, b int, c int, d varchar);
    => INSERT INTO test VALUES (1, 3, 5, 'odds');
    => INSERT INTO test VALUES (10, 14, 8, 'evens');
    => INSERT INTO test VALUES (11, 13, 19, 'odds');
    => COMMIT;
    
  2. Modify the database connection details (host, db, table, user, and password) in the code below with your Vertica connection details.

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    val conf = new SparkConf().setAppName("vertica-spark-connector-testing").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    
    val host = "VerticaHost"
    val db = "VerticaDB"
    val table = "VerticaTable"
    val user = "VerticaUser"
    val password = "VerticaPassword"
    val part = "12";
    
    val opt = Map("host" -> host, "table" -> table, "db" -> db, "numPartitions" -> part, "user" -> user, "password" -> password)
    
    val df = sqlContext.read.format ("com.vertica.spark.datasource.DefaultSource").options(opt).load()
    
    val c = df.select("a").filter("a > 5").count
    println(c)
    sc.stop();
    
  3. Run the example code in a Spark shell. Start the shell, then paste in your modified code.