Using the Vertica RDD API with the Connector V1

Use the Vertica RDD API to load Vertica table data into a Spark RDD. You create a VerticaRDD object either by initiating it or by calling one of the multiple RDD.create methods available. The following example uses VerticaRDD.create to create an RDD from Vertica data. In this example, the parameters of create() are:

Parameter Description
hostname A Vertica node host name
port Vertica port number. The default value is 5433.
prop A property object that include user name and password
table Vertica table name, including the schema name, such as, “schema_name.table_name”.
dbname Vertica database name
col The column names of the Vertica table that will be loaded into Spark. An empty col array means all columns.
numPartitions The number of Spark partitions for the resulting VerticaRDD
mapRow

A function used to convert one row of JDBC results into the element data type T of VerticaRDD. The generated VerticaRDD has the type VerticaRDD[T].

For example:

val extractValues = (r: ResultSet) => {
  (r.getInt(1), r.getInt(2))
}

This function converts a tuple that contains two integers—for example, (101, 102), into an array of objects. Refer to the Spark JDBC RDD for more details about this parameter.

This function must be serializeable by Spark.

RDD Create Methods

The com.vertica.spark.rdd.VerticaRDD API has three different create methods that you can use to create a VerticaRDD object:

  • create(sc, connect, table, columns, numPartitions, mapRow)
  • create(sc, connect, table, columns, numPartition, ipMap, mapRow)
  • create(sc, host, port: Int = 5433, db, properties, table, columns, numPartitions, mapRow)

The parameters for the Create methods:

Parameter Description
sc Spark Context object.
connect A connection object that contains a function that returns an open JDBC Connection.
table A string value for the database table name
columns An Array[String] that contains the columns of the database table that will be loaded. If an empty array, this RDD loads all columns.
numPartitions An integer value specifying the number of partitions for the VerticaRDD.
ipMap

A Map [String, String] containing the optional map from private IP addresses to public IP addresses for all Vertica nodes. This map is used only when Vertica is installed on private IP addresses but is listening on both private and public IP addresses.

You can also update each Vertica node's EXPORT_ADDRESS instead of providing the ipMap parameter if Vertica is running on both private and public IP addresses.

When a database is installed, the export_address and node_address in the system NODES table are set to the same value. If you installed Vertica on a private address, you must set the export_address to a public address for each node.

See Identify the Database or Nodes Used for Import/Export in the Administrator's Guide.

mapRow A function from a ResultSet to a single row of the upi result types you want. This function should call only getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object.
host A string containing the host name (or IP address) of a Vertica node, that can be used to connect to Vertica.
port An integer value specifying the number for Vertica connection. Default Value: 5433
properties Properties object for JDBC connection properties, such as user, password.

Example: Loading Vertica Table Data into a VerticaRDD

This section contains complete example code used to create a VerticaRDD from a Vertica table, test whose definition is:

create table test (a int, b int);

This program creates VerticaRDD[(Int, Int)] and calls count that returns the number of rows of table, test.

// V2S_rdd.scala
import com.vertica.spark.rdd.VerticaRDD
import java.util.Properties
import java.sql.ResultSet
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

  val extractValues = (r: ResultSet) => {
    (r.getInt(1), r.getInt(2))
  }

  val conf = new SparkConf().setAppName("vertica-spark-connector-testing").setMaster("local[1]")
  //val sc = new SparkContext(conf) // uncomment if not used in spark shell

  val host = "VerticaHost"
  val port = 5433
  val db = "VerticaDB"
  val prop = new Properties
  prop.put("user", "VerticaUserName")
  prop.put("password", "VerticaPassword")
  val table = "test"
  val cols = Array[String]()
  val part = 12;
  val data = VerticaRDD.create(sc, host, port, db, prop, table, cols, numPartitions = part, mapRow = extractValues)
  val c = data.count
  println("count:" + c)