Saving Spark Data to Vertica Using the DefaultSource API

The Vertica Connector for Apache Spark provides the com.vertica.spark.datasource.DefaultSource API to simplify writing data from a Spark DataFrame to a Vertica table using the Spark df.write.format() method. The DefaultSource API provides generic key-value options for configuring the database connection and tuning parameters as well as other options.

Requirements

The following requirements apply to using the connector to save Spark data to Vertica.

Limitations

The following limitations apply to using the connector to save Spark data to Vertica.

  • Supported Data Types—The Spark DataFrame cannot have complex types such as Maps, Structs, and Arrays. Vertica currently supports all other Spark data types. The connector cannot save a DataFrame that contains any of these types.
  • Target Table Limitations—The Vertica target table specified cannot be a view or a temporary table.
  • Null Values in Spark and Vertica—Spark’s long type is converted to Vertica’s INTEGER type when data moves from Spark to Vertica. Vertica uses the value -263 (-9223372036854775808) to represent a NULL value. Spark does not treat this value in any special way. If you save -263 from Spark to Vertica then it is stored as NULL in Vertica.
  • SQL Strings—Length of Strings. Currently the connector converts all Spark SQL Strings to VARCHAR(strlen) for Vertica. The ‘strlen’ parameter is a user option that defaults to 1024. If strlen is set to a value greater than 65000, Spark SQL Strings are instead converted to LONG VARCHAR(strlen). Values in the range 1 to 32,000,000 are valid.

    When using SaveMode.Append, the existing Vertica table should have the corresponding column types for Spark SQL Strings declared as either VARCHAR or LONG VARCHAR.

    Spark SQL Strings longer than strlen are always truncated when saving to Vertica.

  • Float Values—Float Values Represented Differently. Because float values are approximations, the value of a float can be different when it is moved from Spark to Vertica. If you require more precision, then use a more precise data type in Spark, such as double or decimal.

  • Gregorian/Julian Calendar Issues—Inconsistent dates before the year 1583. The ORC file writer used to stage the data to in HDFS converts dates before 1583 from the Gregorian calendar to the Julian calendar. However Vertica does not perform this conversion. If your data in Spark contains dates before 1583, then the values in Spark and the corresponding values in Vertica can differ by up to 10 days. This difference applies to both DATE and TIMESTAMP values.

  • Wrong results for Decimal Type. If your version of Vertica is prior to 7.2 SP3, then decimal numbers are incorrectly loaded from Spark to Vertica. Upgrade to a newer version of Vertica to resolve this issue.

  • Different Jobs Writing to the Same Table in Overwrite Mode cause error:—ERROR: java.sql.SQLNonTransientException: [Vertica][VJDBC](3007) ERROR: DDL statement interfered with this statement. If you have different jobs writing to the same table and are using Overwrite mode then errors can occur. For example, Job 1 starts and begins writing to the Vertica table. Then Job 2 begins and is set to write to the same table. Because the mode is Overwrite, it drops the table to which Job 1 is writing, causing an error. Do not use different jobs to write to the same table when using overwrite mode.

  • Files not cleaned up if process interrupted.—The files written on by Spark to HDFS are normally cleaned up when the job completes. However, if something interrupts the process (such as a network failure), then files may be left behind and you should manually cleanup the HDFS directory. The HDFS directory is the one provided in the hdfs_url path, and the specific subdirectory is S2V_jobxyz as reported by the connector.

  • Error if the DataFrame has zero rows—The connector returns an error message asking you to check whether the DataFrame is empty. This error occurs before the connector begins loading data into Vertica. The data in the targeted Vertica table is not altered, even if mode was set to SaveMode.Overwrite (which would normally delete the table).

Parameters

To save your Spark DataFrame to Vertica, you must specify the following required parameters as options to sqlContext.createDataFrame(...).format("com.vertica.spark.datasource.DefaultSource").options(...):

Parameter Description
table The name of the target Vertica table to save your Spark DataFrame.
db The name of the Vertica Database
user The name of the Vertica user. This user must have CREATE and INSERT privileges in the Vertica schema. The schema defaults to “public”, but may be changed using the “dbschema” optional tuning parameter.
password The password for the Vertica user.
host The hostname of a Vertica node. This value is used to make the initial connection to Vertica and look up all the other Vertica node IP addresses. You can provide a specific IP address or a resolvable name such as myhostname.com.
hdfs_url

The fully-qualified path to a directory in HDFS that will be used as a data staging area. For example, hdfs://myhost:8020/data/test.

The connector first saves the DataFrame in its entirety to this location before loading into Vertica. The data is saved in ORC format, and the files are saved in a directory specific to each job. This directory is then is deleted when the job completes.

Note that you need additional configuration changes for to use HDFS. You must first configure all nodes to use HDFS. See Configuring the hdfs Scheme.

You can optionally use the webhdfs protocol instead. See HDFS and WebHDFS.

You must provide an HDFS URL even if you intend to use webhdfs. You can use a dummy URL.

In addition to the required parameters, you can optionally specify the following parameters as options to sqlContext.createDataFrame(...).format("com.vertica.spark.datasource.DefaultSource").options(...).

Parameter Description
dbschema The schema space for the Vertica table. Default Value: public
port The Vertica Port. Default value: 5433
failed_rows_percent_tolerance The tolerance level for failed rows, as a percentage. For example, to specify that the job fail if greater than 10% of the rows are rejected, specify this value as 0.10 for 10% tolerance. Default Value: 0.00
strlen The string length. Use this option to increase (or decrease) the default length when saving Spark StringType to Vertica VARCHAR type. Default Value: 1024
web_hdfs_url

The fully-qualified path to a directory in HDFS that will be used by Vertica to retrieve the data. For example, webhdfs://myserver:50070/data/test. You must use this option (in addition to “hdfs_url”) if the Vertica nodes are not configured for HDFS access. See HDFS and WebHDFS. below.

fileformat

The format for the intermediate data file written to HDFS. Supported values are:

  • "orc" (default)
  • "parquet"

See Intermediate Data File Format Settings below for more information.

target_table_ddl A SQL DDL statement to run in Vertica before copying data. You usually use this option to create the target table to receive the data from Spark. The table this statement creates should match the table name you specify in the table option. See Creating the Destination Table From Spark for more information.
column_copy_list A custom column list to supply to the Vertica COPY statement that loads the Spark data into Vertica. This option lets you adjust the data so that the Vertica table and the Spark data frame to do not have to have the same number of columns. See Copying Data to a Table With a Different Schema for more information.

HDFS and WebHDFS

The connector transfers data from Spark to an HDFS directory before moving it to Vertica. Vertica can access data in the HDFS directory either directly using the hdfs scheme, or through the web-based webhdfs scheme. For greater performance and reliability, Vertica should use direct HDFS access whenever possible.

You must configure your Vertica cluster before it can directly access data stored in HDFS. See Configuring the hdfs Scheme in the Integrating with Apache Hadoop.

By default, the connector uses the hdfs protocol when possible. It falls back to the webhdfs protocol in several cases:

  • If you do not have your nodes configured for direct HDFS access.
  • If the data stored in the HDFS directory is encrypted. The library that Vertica uses to read directly from HDFS cannot decrypt this data. The webhdfs scheme transparently decrypts the data for Vertica.

For Vertica to fall back to webhdfs, you must verify that webhdfs.enabled set to true on your HDFS cluster.

Intermediate Data File Format Settings

When Spark sends data to Vertica, it writes the data to intermediate files stored in HDFS. Vertica then reads these files off of HDFS. By default, Spark writes these data files in ORC format. You can choose to have it use Parquet format instead by setting the fileformat parameter to "parquet".

You usually do not directly access these files, so you normally do not care about their format. The Spark connector automatically deletes these files after Vertica is through with them.

Testing shows that Spark writes ORC files faster than Parquet files. Writing of the data files makes up a large portion of the time it takes to transfer data from Spark to Vertica. Using the format that takes Spark the least amount of time to write results in faster transfers. For this reason, Vertica recommends you use the default ORC format for the intermediate files unless you are required to use Parquet format files.

Quick Start with Code Example

You can use the spark-shell and some brief Scala code to verify that the connector can write data from Spark to Vertica.

  1. Start the spark-shell and include both the Vertica JDBC Driver and the Vertica Spark Connector JAR files in the jars argument. Then, specify any other Spark options you normally use:

    spark-shell –-jars vertica-8.1.0_spark2.0_scala2.11.jar,vertica-jdbc-8.1.0-0.jar other options
  2. The following code creates a DataFrame in Spark and saves it to a new Vertica table using the connector:

    1. Modify the host, db, user, password, and table settings in the following example to match your Vertica instance.
    2. Paste the following code into your Spark shell.

      // S2V_basic.scala
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
      import com.vertica.spark._
      
      // Create a sample DataFrame and save it to Vertica
      val rows = sc.parallelize(Array(
        Row(1,"hello", true),
        Row(2,"goodbye", false)
      ))
      
      val schema = StructType(Array(
        StructField("id",IntegerType, false),
        StructField("message",StringType,true),
        StructField("still_here",BooleanType,true)
      ))
      
      // Note: Spark's API changed between verison 1.6 and 2.0. In version 1.6
      // you use sqlContext to create a DataFrame. In 2.0, you use the spark object.
      // val df = sqlContext.createDataFrame(rows, schema) // Spark 1.6
      val df = spark.createDataFrame(rows, schema) // Spark 2.0
      
      // View the sample data and schema
      df.show
      df.schema
      
      // Setup the user options, defaults are shown where applicable for optional values.
      // Replace the values in italics with the settings for your Vertica instance.
      val opts: Map[String, String] = Map(
        "table" -> "VerticaTableName",
        "db" -> "VerticaDatabaseName",
        "user" -> "VerticaDatabaseUser",
        "password" -> "VerticaDatabasePassword",
        "host" -> "VerticaHostName",
        "hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
        "web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
        // "failed_rows_percent_tolerance"-> "0.00"   // OPTIONAL (default val shown)
        // "dbschema" -> "public"                     // OPTIONAL (default val shown)
        // "port" -> "5433"                           // OPTIONAL (default val shown)
        // "strlen" -> "1024"                         // OPTIONAL (default val shown)
        // "fileformat" -> "orc"                      // OPTIONAL (default val shown)
      )
      
      // SaveMode can be either Overwrite, Append, ErrorIfExists, Ignore
      val mode = SaveMode.Overwrite
      
      // save the DataFrame via Spark’s Datasource API
      df.write.format("com.vertica.spark.datasource.DefaultSource").options(opts).mode(mode).save()

Creating the Destination Table From Spark

You can use the DefaultSource's target_table_ddl option to run an arbitrary Vertica DDL  statement before Vertica load your data. You usually use this option to create a table to receive the data. The content of the target_table_ddl statement is run before Vertica loads the data exported from Spark.

The following example shows an options object that has a target_table_ddl option that creates a target table named employee for the data the Spark Connector loads into Vertica:

val opts: Map[String, String] = Map(
  "table" -> "employee",
  "target_table_ddl" -> "
    CREATE TABLE employee (
        key         IDENTITY(1,1),
        fullname    VARCHAR(1024)   NOT NULL,
        age         INTEGER,
        hiredate    DATE            NOT NULL,
        region      VARCHAR(1024)   NOT NULL,
        loaddate    TIMESTAMP       DEFAULT NOW())
    PARTITION BY EXTRACT (year FROM hiredate);
    CREATE PROJECTION employee_p(key, fullname, hiredate) AS
    SELECT key, fullname, hiredate FROM employee SEGMENTED BY HASH(key) ALL NODES;",
  "db" -> "VerticaDatabaseName",
  "user" -> "VerticaDatabaseUser",
  "password" -> "VerticaDatabasePassword",
  "host" -> "VerticaHostName",
  "hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
  "web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
)

Copying Data to a Table With a Different Schema

Normally, the column definitions in your Spark DataFrame matches the columns in your Vertica target table. Sometimes, you cannot have your target table exactly match the definition of your DataFrame. For example, suppose the definition of your DataFrame has to change, but you do not want to update the schema of your target table as your client applications rely on it remaining the same. In this case, you can use the column_copy_list option to alter the data columns from the DataFrame into values that can be inserted into your Vertica target table. The contents of this option are passed as column arguments to the COPY statement that Vertica uses to load the data. Using it, you can split, combine, or skip columns in the DataFrame as COPY loads them into your target table. See Transforming Data During Loads and Manipulating Source Data Columns for more information on using column arguments in the COPY statement to change the source data to match your target table's schema.

The following example demonstrates using the column_copy_list setting in a situation where the DataFrame has separate column for a first, middle, and last name, but the Vertica target table has just a single column for a full name. It also demonstrates compensating for the DataFrame's lack of an age column by setting the target table's age column value to NULL.

val opts: Map[String, String] = Map(
  "table" -> "employee",
  "copy_column_list" -> "
    firstname   FILLER VARCHAR(1024),
    middlename  FILLER VARCHAR(1024),
    lastname    FILLER VARCHAR(1024),
    fullname AS firstname ||' '|| NVL(middlename,'') ||' '|| lastname,
    age as NULL,
    hiredate,
    region",
  "db" -> "VerticaDatabaseName",
  "user" -> "VerticaDatabaseUser",
  "password" -> "VerticaDatabasePassword",
  "host" -> "VerticaHostName",
  "hdfs_url" -> "hdfs://HDFSNameNode:9000/user/hduser/someDirectory",
  "web_hdfs_url" -> "webhdfs://HDFSNameNode:50070/user/hduser/someDirectory"
)