Loading Vertica Data into a Spark DataFrame or RDD
The Vertica Connector for Apache Spark includes APIs to simplify loading Vertica table data efficiently with an optimized parallel data-reader:
com.vertica.spark.datasource.DefaultSource
— The data source API, which is used for writing to Vertica and is also optimized for loading data into a DataFrame.- RDD API
com.vertica.spark.rdd.VerticaRDD
— This API simplifies creating an RDD object based on a Vertica table or view.
Typically, Vertica tables are segmented across multiple nodes. Using the Spark connector, you invoke a parallel data reader to efficiently read data from Vertica by minimizing data movement between Vertica nodes. The DataFrame reader supports pushing column and row filtering to Vertica to avoid transferring large volumes of Vertica data into the Spark in-memory data structures.
Important: For the best possible performance, segment your Vertica table by hash on one or more attributes that return integer values. Hewlett Packard Enterprise does not recommend segmenting by a custom expression, because doing so can result in lower performance than segmenting by hash. See Hash Segmentation Clause for more details. An example of creating a table segmented by hash:
create table example(a integer, b integer) segmented by hash(a) all nodes;
Loading Vertica Table Segments into the Spark DataFrames and RDD Partitions
All Spark RDDs and DataFrames require you to define the number of partitions. You can define an arbitrary number of partitions. The Vertica Spark connector library automatically generates the hash-intervals for each partition and intervals. When you segment the table by a proper hash expression on one or more of its columns, these intervals minimize cross-node data shuffling (w) inside Vertica and data skew.
No cross-node data shuffling occurs inside Vertica when both of the following conditions exist:
- The Vertica cluster has N nodes.
- The number of the Spark partitions is P*N (P is an integer greater than 0).
Therefore, you can use the full network throughput for data transfer, achieving the best performance. The following figure shows 8 Spark partitions defined over 4 Vertica segmentations.
The next figure shows another example where the number of Spark partitions is smaller (two) than the number of Vertica segmentations (four).
According to Vertica benchmark tests, a setting of P=4 achieves the best loading performance. For example, when Vertica has 4 nodes, the number of Spark partitions recommended is 16 (P = 4, Nodes = 4, P * N = 16 partitions).
During Spark job execution, each Spark partition executes inside a task. Each task launches a JDBC connection to a Vertica node. This connection contacts the Vertica node that stores the segment containing the data of the Spark partition. The Spark task then issues a query to the Vertica node and fetches the query result into a VerticaRDD/DataFrame.
For an unsegmented Vertica table, the connector replicates the table onto multiple Vertica nodes. The Spark partition is defined as a range on the sorted columns of the table. Spark tasks send the queries to different replicas for load-balancing. The next figure shows Spark partitions defined over an unsegmented table.
The following figure illustrates the runtime behavior of the Vertica to Spark connector. Spark tasks containing VerticaRDD/DataFrame partitions fetch data from Vertica through JDBC connections. Those queries that are used to fetch data are based on the computed ranges of hash values.
Spark Numeric Value Limitation
Both Vertica and Spark support variable-precision NUMERIC values. Vertica's NUMERIC values support more digits of precision (up to 1024) than Spark does (38 digits). If you attempt to copy a Vertica table to Spark that has a NUMERIC column with more than 38 digits of precision, the VerticaDataSourceRDD class throws an error similar to the following:
java.lang.IllegalArgumentException: requirement failed: Decimal precision 41 exceeds max precision 38 at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.types.Decimal.set(Decimal.scala:113) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:426) at com.vertica.spark.datasource.VerticaDataSourceRDD$$anon$1.getNext(VerticaRDD.scala:382) . . .