Saving Spark Data to Vertica Using the DefaultSource API

The Vertica Connector for Apache Spark provides the com.vertica.spark.datasource.DefaultSource API to simplify writing data from a Spark DataFrame to a Vertica table using the Spark df.write.format() method. The DefaultSource API provides generic key-value options for configuring the database connection and tuning parameters as well as other options.

Requirements

The following requirements apply to using the connector to save Spark data to Vertica.

Limitations

The following limitations apply to using the connector to save Spark data to Vertica.

Parameters

To save your Spark DataFrame to Vertica, you must specify the following required parameters as options to sqlContext.createDataFrame(...).format("com.vertica.spark.datasource.DefaultSource").options(...):

Parameter Description
table The name of the target Vertica table to save your Spark DataFrame.
db The name of the Vertica Database
user The name of the Vertica user. This user must have CREATE and INSERT privileges in the Vertica schema. The schema defaults to “public”, but may be changed using the “dbschema” optional tuning parameter.
password The password for the Vertica user.
host The hostname of a Vertica node. This value is used to make the initial connection to Vertica and look up all the other Vertica node IP addresses. You can provide a specific IP address or a resolvable name such as myhostname.com.
hdfs_url

The fully-qualified path to a directory in HDFS that will be used as a data staging area. For example, hdfs://myhost:8020/data/test.

The connector first saves the DataFrame in its entirety to this location before loading into Vertica. The data is saved in ORC format, and the files are saved in a directory specific to each job. This directory is then is deleted when the job completes.

Note that you need additional configuration changes for to use HDFS. You must first configure all nodes to use HDFS. See Configuring the hdfs Scheme.

You can optionally use the webhdfs protocol instead. See HDFS and WebHDFS.

Note: You must provide an HDFS URL even if you intend to use webhdfs. You can use a dummy URL.

In addition to the required parameters, you can optionally specify the following parameters as options to sqlContext.createDataFrame(...).format("com.vertica.spark.datasource.DefaultSource").options(...).

Parameter Description
dbschema The schema space for the Vertica table. Default Value: public
port The Vertica Port. Default value: 5433
failed_rows_percent_tolerance The tolerance level for failed rows, as a percentage. For example, to specify that the job fail if greater than 10% of the rows are rejected, specify this value as 0.10 for 10% tolerance. Default Value: 0.00
strlen The string length. Use this option to increase (or decrease) the default length when saving Spark StringType to Vertica VARCHAR type. Default Value: 1024
web_hdfs_url

The fully-qualified path to a directory in HDFS that will be used by Vertica to retrieve the data. For example, webhdfs://myserver:50070/data/test. You must use this option (in addition to “hdfs_url”) if the Vertica nodes are not configured for HDFS access. See HDFS and WebHDFS. below.

HDFS and WebHDFS

The connector transfers data from Spark to an HDFS directory before moving it to Vertica. Vertica can access data in the HDFS directory either directly using the hdfs scheme, or through the web-based webhdfs scheme. For greater performance and reliability, Vertica should use direct HDFS access whenever possible.

You must configure your Vertica cluster before it can directly access data stored in HDFS. See Configuring the hdfs Scheme in the Integrating with Apache Hadoop.

By default, the connector uses the hdfs protocol when possible. It falls back to the webhdfs protocol in several cases:

For Vertica to fall back to webhdfs, you must verify that webhdfs.enabled set to true on your HDFS cluster.

Quick Start with Code Example

You can use the spark-shell and some brief Scala code to verify that the connector can write data from Spark to Vertica.

  1. Start the spark-shell and include both the Vertica JDBC Driver and the Vertica Spark Connector JAR files in the jars argument. Then, specify any other Spark options you normally use:

    spark-shell –-jars vertica-8.1.0_spark2.0_scala2.11.jar,vertica-jdbc-8.1.0-0.jar other options
  2. The following code creates a DataFrame in Spark and saves it to a new Vertica table using the connector:

    1. Modify the host, db, user, password, and table settings in the following example to match your Vertica instance.
    2. Paste the following code into your Spark shell.

      // S2V_basic.scala
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
      import com.vertica.spark._
      
      // Create a sample DataFrame and save it to Vertica
      val rows = sc.parallelize(Array(
        Row(1,"hello", true),
        Row(2,"goodbye", false)
      ))
      
      val schema = StructType(Array(
        StructField("id",IntegerType, false),
        StructField("message",StringType,true),
        StructField("still_here",BooleanType,true)
      ))
      
      // Note: Spark's API changed between verison 1.6 and 2.0. In version 1.6
      // you use sqlContext to create a DataFrame. In 2.0, you use the spark object.
      // val df = sqlContext.createDataFrame(rows, schema) // Spark 1.6
      val df = spark.createDataFrame(rows, schema) // Spark 2.0
      
      // View the sample data and schema
      df.show
      df.schema
      
      // Setup the user options, defaults are shown where applicable for optional values.
      // Replace the values in italics with the settings for your Vertica instance.
      val opts: Map[String, String] = Map(
        "table" -> "VerticaTableName",
        "db" -> "VerticaDatabaseName",
        "user" -> "VerticaDatabaseUser",
        "password" -> "VerticaDatabasePassword",
        "host" -> "VerticaHostName",
        "hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
        "web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
        // "failed_rows_percent_tolerance"-> "0.00"   // OPTIONAL (default val shown)
        // "dbschema" -> "public"                     // OPTIONAL (default val shown)
        // "port" -> "5433"                           // OPTIONAL (default val shown)
        // "strlen" -> "1024"                         // OPTIONAL (default val shown)
        
      )
      
      // SaveMode can be either Overwrite, Append, ErrorIfExists, Ignore
      val mode = SaveMode.Overwrite
      
      // save the DataFrame via Spark’s Datasource API
      df.write.format("com.vertica.spark.datasource.DefaultSource").options(opts).mode(mode).save()