Archive for November, 2010

How to Make Pig for Hadoop SQL

A few weeks back, my co-worker Rajat Venkatesh gave a little talk on his work with the Vertica Connector for Hadoop.  When he displayed a Pig program, the de facto programming language for Hadoop, I noted that it looks a lot like SQL.  And also, that it shouldn’t be too hard to convert a Pig program to SQL, allowing us to run Pig on Vertica directly!

So, I spent a few hours over the last week and whipped up an engine for running Pig programs on Hadoop and Vertica.  I’m not trying to start up a Hadoop vs. database war, or even “perfume” Pig (sorry… bad joke).  I just wanted to make life easier for those users who are combining Hadoop and Vertica to get the most out of their data.

The core idea of the Vertica Pig/SQL conversion engine is to rewrite the data flow described by the Pig program into a set of nested SQL queries that produce the query answer. The conversion engine uses the regular PigScriptParser, looks at the LogicalPlan, and transforms each Operator into a SQL statement that implements its functionality.

The Vertica Connector for Hadoop is used to move data from HDFS into Vertica and from Vertica back into HDFS.  We run a mixture of Pig scripts and SQL scripts in order to create tables in Vertica, move the data from Hadoop to Vertica, compute the result, and move it back to Hadoop.  The hope is to seamlessly take advantage of Vertica to run your Pig program faster.

Here’s a fairly trivial Pig script example:

x = LOAD ‘foo.log’ USING PigStorage(‘|’) as (a:int,b:int,c:int);
y = FILTER x BY a > 5;
z = FOREACH y GENERATE b*c as f;
z2 = DISTINCT z;
z3 = FOREACH z2 GENERATE f, f*f as g;
z4 = ORDER z3 by g;
STORE z4 INTO ‘zzz';

And here is the corresponding code run by the Vertica conversion engine:

Script: schema refresh(SQL)[ErrorsOK]
DROP SCHEMA squeal_PigLatin_simple_pig CASCADE;
CREATE SCHEMA squeal_PigLatin_simple_pig;
Script: sql setup(SQL)
CREATE TABLE squeal_PigLatin_simple_pig.hdfs_L3H9215_verticacorp_com_9935_user_bvandiver_simple_foo_log
a int,
b int,
c int,
d varchar(65000)
Script: test sql(SQL)
SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT f as f,f * f as g FROM (SELECT DISTINCT * FROM (SELECT b * c as f FROM (SELECT * FROM squeal_PigLatin_simple_pig.hdfs_L3H9215_verticacorp_com_9935_user_bvandiver_simple_foo_log as x WHERE a > 5) AS y) AS z) as z2) AS z3 ORDER BY g) AS z4 LIMIT 3) AS z5
Script: load3(PIG)
x = LOAD ‘hdfs://’ USING PigStorage(‘|’) AS (a:int,b:int,c:int,d:chararray);
STORE x into ‘{squeal_PigLatin_simple_pig.hdfs_L3H9215_verticacorp_com_9935_user_bvandiver_simple_foo_log(a int,b int,c int,d varchar(65000))}’ using com.vertica.pig.VerticaStorer(‘localhost’,’verticadb502′,’5935′,’bvandiver’,'””‘);
Script: populate vdata4(SQL)
CREATE TABLE squeal_PigLatin_simple_pig.vdata AS SELECT * FROM (SELECT * FROM (SELECT * FROM (SELECT f as f,f * f as g FROM (SELECT DISTINCT * FROM (SELECT b * c as f FROM (SELECT * FROM squeal_PigLatin_simple_pig.hdfs_L3H9215_verticacorp_com_9935_user_bvandiver_simple_foo_log as x WHERE a > 5) AS y) AS z) as z2) AS z3 ORDER BY g) AS z4 LIMIT 3) AS z5
Script: store vdata5(PIG)
vdata = LOAD ‘sql://{SELECT * FROM squeal_PigLatin_simple_pig.vdata}’ using com.vertica.pig.VerticaLoader(‘localhost’,’verticadb502′,’5935′,’bvandiver’,'””‘);
STORE vdata INTO ‘hdfs://’ USING org.apache.pig.builtin.PigStorage();

While this example only shows filter, distinct, order, and foreach, the conversion engine currently supports limit, join, sample, and group as well.

There are a number of cool tricks we can do to improve performance. One trick is that we’re already taking advantage of the Vertica Optimizer to make data-aware decisions. For example, the Optimizer will chose join order based on the data actually loaded, instead of relying on the programmer to know what the best order was (and specify it correctly!).

Additionally, we can take advantage of the Vertica Database Designer. Our Database Designer automatically chooses a good data layout and compression strategy for storing the data in Vertica. The Designer uses the schema, the queries, and a sample of the data as its input – all of which the conversion engine can provide and involve programmatically. Finally, you can leave the original data in Vertica, so the second time you run the script, you avoid paying the cost of transferring the data.

What’s the take-away? With the Vertica Pig/SQL conversion engine, dislike of writing SQL is not a reason to throw away your database. If scalability is your major concern, I’d encourage you to see how well Vertica scales up to the challenge of large data.

For an off-hours, couple-day hack, the conversion engine is pretty feature-rich! Today, the conversion engine doesn’t support all of Pig (for instance cogroup or user-defined functions), but this is simply a matter of time and effort. We would love to hear from our user community on the interest in using the Vertica Pig/SQL conversion engine if it were a real product. If so, we will improve it and release it as an open source download.


Life Beyond Indices: The Query Benefits of Storing Sorted Data

With the Vertica Analytics Platform, there are a number of benefits to storing compressed and sorted data, as well as operating directly on compressed data, that we have discussed in previous posts. In this post, I’m going to discuss how the Vertica Analytics Platform takes advantage of this sorted data to make query execution blindingly fast, which obviates the need for traditional DBMS indexes.

Unlike traditional DBMS solutions, Vertica has no user-defined indexes. Indexes in an analytic database take up DBA time (figuring out which indexes to make), storage capacity, and load time (to keep them up to date). Even if an index consumes only 10% of the size of the original data and takes 10% more time during load, storing even a few indexes on terabytes of data is costly.  As we have mentioned before, a true column store isn’t just a vertically-partitioned row store.

How does Vertica query huge volumes without indexes? It’s easy… the data is sorted by column value, something we can do because we wrote both our storage engine and execution engine from scratch. We don’t store the data by insert order, nor do we limit sorting to within a set of disk blocks. Instead, we have put significant engineering effort into keeping the data totally sorted during its entire lifetime in Vertica. It should be clear how sorted data increases compression ratios (by putting similar values next to each other in the data stream), but it might be less obvious at first how we use sorted data to increase query speed as well.

Let’s start with the simplest and easiest to understand example: the data is sorted the way a query requests it (ORDER BY). Consider a snippet of trading data sorted by stock and price (see Table 1).  If the user’s query requests all the data ordered by the stock and price, they might use something like:

SELECT stock, price FROM ticks ORDER BY stock, price;

Clearly Vertica is off the hook to do any sort at runtime: data is just read off disk (with perhaps some merging) and we are done.

Table 1: Illustration of data sorted on (stock, price). Other columns are omitted for clarity.

A more interesting query might ask for a single stock’s data ordered by price:

SELECT stock, price FROM ticks WHERE stock=’IBM’ ORDER BY price;

Finding rows in storage (disk or memory) that match stock=’IBM’ is quite easy when the data is sorted, simply by applying your favorite search algorithm (no indexes are required!). Furthermore, it isn’t even necessary to sort the stock=’IBM’ rows because the predicate ensures the secondary sort becomes primary within the rows that match as illustrated below:

Table 2: when only rows that match stock=’IBM’ are considered, the results are ordered by price, and thus no additional sorting is required.

Next, let us consider a query that computes the average price for each stock symbol:

SELECT stock, avg(price) FROM ticks GROUP BY stock;

In general, the aggregator operator does not know a priori how many distinct stocks there are nor in what order that they will be encountered. One common approach to computing the aggregation is to keep some sort of lookup table in memory with the partial aggregates for each distinct stock. When a new tuple is read by the aggregator, its corresponding row in the table is found (or a new one is made) and the aggregate is updated as shown below:

Table 3: Illustration of aggregation when data is not sorted on stock. The aggregator has processed the first 4 rows: It has updated HPQ three times with 100, 102 and 103 for an average of 101.66, and it has updated IBM once for an average of 100. Now it encounters ORCL and needs to make a new entry in the table.

This scheme, often denoted as “Group By Hash” because a hash table is used as the lookup data structure, does a good job when there are a small number of groups. However, when there are a large number of groups, it takes significant RAM to store the hash table and provisions need to be made when RAM is exhausted (typically by spilling to disk).

With Vertica, a second type of aggregation algorithm is possible because the data is already sorted, so every distinct stock symbol appears together in the input stream. In this case, the aggregator can easily find the average stock price for each symbol while keeping only one intermediate average at any point in time. Once it sees a new symbol, the same symbol will never be seen again and the current average may be generated. This is illustrated below:

Table 4: Illustration of aggregation when data is sorted on stock. The aggregator has processed the first 7 rows. It has already computed the final averages of stock A and of stock HPQ and has seen the first value of stock IBM resulting in the current average of 100. When the aggregator encounters the next IBM row with price 103 it will update the average to 101.5. When the ORCL row is encountered the output row IBM,101.5 is produced.

This scheme, commonly called “one pass aggregation” has pipelined parallelism (the same concept as instruction pipelining) if the data is already sorted according to stock. This means we can start producing tuples for downstream operators to consume almost immediately. Given that the Vertica execution is multi-threaded, and all modern machines have multiple cores, pipelined parallelism decreases query execution time.

Of course, one pass aggregation is used in other systems (often called SORT GROUP BY), but they require a sort at runtime to sort the data by stock. Forcing a sort before the aggregation costs execution time and it prevents pipelined parallelism because all the tuples must be seen by the sort before any can be sent on. Using an index is also a possibility, but that requires more I/O, both to get the index and then to get the actual values. This is a reasonable approach for systems that aren’t designed for reporting, such as those that are designed for OLTP, but for analytic systems that often handle queries that contain large numbers of groups it is a killer.

I hear you ask what kind of real-world queries aggregate large numbers of groups? There are at least two very common scenarios that our customers encounter: distinct counts and correlated subqueries with aggregation that have been flattened into joins. Our web analytics customers typically have queries that look for distinct visitors given some condition such as:

SELECT count(DISTINCT visitor_id) FROM user_sessions WHERE <filtering predicates>;

The applicability of one-pass aggregation can be seen if we rewrite the query to an equivalent form:

SELECT COUNT(sq.visitor_id) from (select visitor_id FROM user_sessions WHERE <filtering predicates> GROUP BY visitor_id) as sq

And as such is amenable to the same “group by pipeline” optimization of data sorted on visitor_id. As you are probably glazing over at this point, I will postpone further technical discussion of flattened subqueries for a future discussion if there is sufficient interest.

Another area where having pre-sorted data helps is the computation of SQL-99 analytics. We can optimize the PARTITON BY clause in a manner very similar to GROUP BY when the partition keys are sequential in the data stream. We can also optimize the analytic ORDER BY clause similarly to the normal SQL ORDER BY clause.

The final area to consider is Merge-Join. Of course this is not a new idea, but other database systems typically have Sort-Merge-Join, whereby a large join can be performed by pre-sorting the data from both input relations according to the join keys. Since Vertica already has the data sorted, it is often possible to skip the costly sort and begin the join right away.

Since sorting is such a fundamental part of our system, we have built sophisticated infrastructure in the Vertica Optimizer to track the sortedness of various intermediate results. Our infrastructure takes into account that some columns are equivalent after joining, that some columns have had constant predicates, that some expressions (e.g. price * 100) maintain sortedness, and a host of other factors. By keeping careful track, we maximize the opportunities to apply the optimizations shown above, all without any additional storage.

Of course, Vertica is not limited to a single sort order for each table. In fact, if redundant copies of the data need to be stored to survive node failures, the different copies can be stored with different sort orders. Different sort orders furthers the chance that we can apply one of our sort-based optimizations. And lest you think we are simply swapping determining sort order for determining indexes for a new DBA headache, the optimal sort order of the physical storage is typically automatically determined by the Vertica Database Designer!

If anyone wants me to spell out a specific topic in more detail leave a comment below and let me know!


What’s New in Vertica 4.1

Vertica announced a new version of its Vertica Analytics Platform software, version 4.1, on Tuesday, November 9th at the TDWI Orlando. You can read more about Vertica 4.1 in the press release, but I wanted to give you a few of the highlights of the features that make 4.1 so important to our customers, or anyone looking to make the most of their data.

What’s New in Vertica 4.1 from Vertica Systems on Vimeo.

Here are some highlights from the video:

What’s New Intro
Third-Party Tools Integration – 0:43
SQL Macros – 2:14
Enhanced Security & Authentication – 2:47
Updates & Deletes – 3:27
Vertica 4.1 Wrap Up – 3:50
We hope you enjoy the video!

Get Started With Vertica Today

Subscribe to Vertica