Using the Vertica RDD API with the Connector V1
Use the Vertica RDD API to load Vertica table data into a Spark RDD. You create a VerticaRDD object either by initiating it or by calling one of the multiple RDD.create methods available. The following example uses VerticaRDD.create
to create an RDD from Vertica data. In this example, the parameters of create() are:
Parameter | Description |
---|---|
hostname
|
A Vertica node host name |
port
|
Vertica port number. The default value is 5433. |
prop
|
A property object that include user name and password |
table
|
Vertica table name, including the schema name, such as, “schema_name.table_name”. |
dbname
|
Vertica database name |
col
|
The column names of the Vertica table that will be loaded into Spark. An empty col array means all columns. |
numPartitions
|
The number of Spark partitions for the resulting VerticaRDD |
mapRow
|
A function used to convert one row of JDBC results into the element data type T of VerticaRDD. The generated VerticaRDD has the type VerticaRDD[T]. For example: val extractValues = (r: ResultSet) => { (r.getInt(1), r.getInt(2)) } This function converts a tuple that contains two integers—for example, (101, 102), into an array of objects. Refer to the Spark JDBC RDD for more details about this parameter. This function must be serializeable by Spark. |
RDD Create Methods
The com.vertica.spark.rdd.VerticaRDD API has three different create methods that you can use to create a VerticaRDD object:
create(sc, connect, table, columns, numPartitions, mapRow)
create(sc, connect, table, columns, numPartition, ipMap, mapRow)
create(sc, host, port: Int = 5433, db, properties, table, columns, numPartitions, mapRow)
The parameters for the Create methods:
Parameter | Description |
---|---|
sc
|
Spark Context object. |
connect
|
A connection object that contains a function that returns an open JDBC Connection. |
table
|
A string value for the database table name |
columns
|
An Array[String] that contains the columns of the database table that will be loaded. If an empty array, this RDD loads all columns. |
numPartitions
|
An integer value specifying the number of partitions for the VerticaRDD. |
ipMap
|
A You can also update each Vertica node's EXPORT_ADDRESS instead of providing the ipMap parameter if Vertica is running on both private and public IP addresses. When a database is installed, the export_address and node_address in the system NODES table are set to the same value. If you installed Vertica on a private address, you must set the export_address to a public address for each node. See Identify the Database or Nodes Used for Import/Export in the Administrator's Guide. |
mapRow
|
A function from a ResultSet to a single row of the upi result types you want. This function should call only getInt , getString , etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object.
|
host
|
A string containing the host name (or IP address) of a Vertica node, that can be used to connect to Vertica. |
port
|
An integer value specifying the number for Vertica connection. Default Value: 5433 |
properties
|
Properties object for JDBC connection properties, such as user, password. |
Example: Loading Vertica Table Data into a VerticaRDD
This section contains complete example code used to create a VerticaRDD from a Vertica table, test whose definition is:
create table test (a int, b int);
This program creates VerticaRDD[(Int, Int)]
and calls count that returns the number of rows of table, test.
// V2S_rdd.scala import com.vertica.spark.rdd.VerticaRDD import java.util.Properties import java.sql.ResultSet import org.apache.spark.SparkConf import org.apache.spark.SparkContext val extractValues = (r: ResultSet) => { (r.getInt(1), r.getInt(2)) } val conf = new SparkConf().setAppName("vertica-spark-connector-testing").setMaster("local[1]") //val sc = new SparkContext(conf) // uncomment if not used in spark shell val host = "VerticaHost" val port = 5433 val db = "VerticaDB" val prop = new Properties prop.put("user", "VerticaUserName") prop.put("password", "VerticaPassword") val table = "test" val cols = Array[String]() val part = 12; val data = VerticaRDD.create(sc, host, port, db, prop, table, cols, numPartitions = part, mapRow = extractValues) val c = data.count println("count:" + c)