Vertica

Distributed R: Examples

The following examples further illustrate the Distributed R programming model.

Getting started

Follow the steps in the installation guide to first install Distributed R. Load the Distributed R library and start the cluster by calling distributedR_start.

           > library(distributedR)
           > distributedR_start()

           [1] TRUE

You can view the status of the cluster with distributedR_status. It shows details such as the number of workers in the cluster, number of R instances managed by each worker, system memory available on each worker node, and so on.

     > distributedR_status()
      Workers Inst SysMem MemUsed DarrayQuota DarrayUsed
     1 127.0.0.1:9090 3 5953 4869 2678 0
     > distributedR_shutdown()
     [1] TRUE

The last command shuts down the Distributed R cluster.

Creating a distributed array

Next, create a distributed array. Create a 9×9 dense array by specifying its size and its partition type. The example below shows how to partition the array into 3×3 blocks and set all its elements to the value 10. In this example, there are nine partitions that could reside on remote nodes.

     > library(distributedR)
     > distributedR_start()
     [1] TRUE
     > A <- darray(dim=c(9,9), blocks=c(3,3), sparse=FALSE, data=10)

You can print the number of partitions using npartitions and fetch the whole array at the master by calling getpartition. If you have a large array containing billions of rows, fetching the whole array at the master is not recommended as it defeats the purpose of managing huge datasets by distributing data across multiple workers.

> npartitions(A)
[1] 9
> getpartition(A)

   [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9]
[1,] 10   10   10   10   10   10   10   10   10 
[2,] 10   10   10   10   10   10   10   10   10 
[3,] 10   10   10   10   10   10   10   10   10 
[4,] 10   10   10   10   10   10   10   10   10 
[5,] 10   10   10   10   10   10   10   10   10 
[6,] 10   10   10   10   10   10   10   10   10 
[7,] 10   10   10   10   10   10   10   10   10 
[8,] 10   10   10   10   10   10   10   10   10 
[9,] 10   10   10   10   10   10   10   10   10 

In most cases, you partition arrays by rows or columns (1-D partitioning) instead of blocks (2-D partitioning). Since row and column partitioning is a special case of block partitioning, this example details block partitioning. If you partition array A by rows by using blocks=c(3,9) instead of blocks=c(3,3), then each partition contains 3 rows and all the columns.

Parallel programming with foreach

The foreach loop is a flexible and powerful construct to manipulate distributed data structures. This example illustrates its use by initializing a distributed array with different values. Create another distributed array B with the same size (9×9) as array A and partitioned in the same manner. In the previous example, the argument data initialized all elements of array A to 10. You cannot, however, use data to set different values to array elements. Instead, start a foreach loop, pass partitions of array B, and assign values to the partition inside the loop.

     > B <- darray(dim=c(9,9), blocks=c(3,3), sparse=FALSE)
     > foreach(i, 1:npartitions(B),
     + init<-function(b = splits(B,i), index=i){
     + b <- matrix(index, nrow=nrow(b),ncol=ncol(b))
     + update(b)
     + })
     [1] TRUE

The syntax of foreach is foreach(iteration variable, range, function). In the above example, i is the iteration variable which takes values from 1 to 9, creating nine parallel tasks that execute the functions. The function passes the ith partition of B using splits(B,i). It also passes the value of i and assigns a matrix to the partition. The matrix is of the same size as the partition (3×3) but initialized by the value of the iteration variable. The ith partition contains all elements equal to i. You can fetch the whole array using getpartition.

> getpartition(B)

     [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9]
[1,]   1    1    1    2    2    2    3    3    3  
[2,]   1    1    1    2    2    2    3    3    3   
[3,]   1    1    1    2    2    2    3    3    3   
[4,]   4    4    4    5    5    5    6    6    6   
[5,]   4    4    4    5    5    5    6    6    6   
[6,]   4    4    4    5    5    5    6    6    6   
[7,]   7    7    7    8    8    8    9    9    9   
[8,]   7    7    7    8    8    8    9    9    9   
[9,]   7    7    7    8    8    8    9    9    9   

You can fetch a particular partition by specifying the partition index. In this example, a user fetches the fifth partition.

     > getpartition(B,5)

           [,1] [,2] [,3]
     [1,]    5    5    5
     [2,]    5    5    5
     [3,]    5    5    5

When using foreach, remember that only variables passed as arguments to the function (init in this case) are available for use within the function. For example, you cannot use array A or its partitions within the function. Even the iterator variable i must be passed as an argument. Additionally, loop functions do not return any value. To make data modifications visible, call update on the partition.
Update can be used only on distributed arrays and data-frame arguments. For example, update(index) is not valid as index is not a distributed object.

Parallel array addition

With two initialized distributed arrays, you can begin computations, such as adding the element of the arrays. This example uses a foreach loop to perform the parallel addition. Create an output array C of the same size and partitioning scheme. In the foreach loop, pass the ith partition of all three arrays, A, B, and C. Within the loop, add the corresponding partitions, place the output in c, and call update:

     > C <- darray(dim=c(9,9), blocks=c(3,3))
     > foreach(i, 1:npartitions(A),
     + add<-function(a = splits(A,i), b = splits(B,i), c = splits(C,i)){
     + c <- a + b
     + update(c)
     + })
     [1] TRUE
     > getpartition(C)

   [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9]
[1,] 11   11   11   12   12   12   13   13   13  
[2,] 11   11   11   12   12   12   13   13   13  
[3,] 11   11   11   12   12   12   13   13   13  
[4,] 14   14   14   15   15   15   16   16   16  
[5,] 14   14   14   15   15   15   16   16   16  
[6,] 14   14   14   15   15   15   16   16   16  
[7,] 17   17   17   18   18   18   19   19   19  
[8,] 17   17   17   18   18   18   19   19   19  
[9,] 17   17   17   18   18   18   19   19   19  

A foreach loop can perform any parallel operation. The Distributed R package provides basic operators that work out-of-the-box on distributed arrays. These operators include array addition, subtraction, multiplication, and summary statistics such as max, min, mean, and sum (including their column and row versions such as colSums). Internally, these operators are implemented using foreach loops.. The following example illustrates some of these operators in action:

> D <- A+B
> getpartition(D)

   [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,9]
[1,] 11   11   11   12   12   12   13   13   13  
[2,] 11   11   11   12   12   12   13   13   13  
[3,] 11   11   11   12   12   12   13   13   13  
[4,] 14   14   14   15   15   15   16   16   16  
[5,] 14   14   14   15   15   15   16   16   16  
[6,] 14   14   14   15   15   15   16   16   16  
[7,] 17   17   17   18   18   18   19   19   19  
[8,] 17   17   17   18   18   18   19   19   19  
[9,] 17   17   17   18   18   18   19   19   19  

> mean(D)
[1] 15

     > colSums(D)
     [1] 126 126 126 135 135 135 144 144 144

Creating a distributed data frame

The syntax for distributed data frames is similar to distributed arrays, however, data frames can store non-numeric values.

Create a 9×9 data frame by specifying its size and how it is partitioned:

     > dF <- dframe(dim=c(9,9), blocks=c(3,3))

The dataframe dF has 9 partitions, each of size 3×3. Unlike distributed arrays, the data frame has no elements unless data is explicitly loaded:

> getpartition(dF)
data frame with 0 columns and 0 rows

To add data, use a foreach loop:

     > foreach(i, 1:npartitions(dF),
     + init<-function(df = splits(dF,i), index=i, n=3){
     + p <- matrix(index, nrow=n,ncol=n-1)
     + q <- rep("HP",n)
     + df<- data.frame(p,q)
     + update(df)
     + })

     [1] TRUE

Each partition now has a column that contains the string HP:

> getpartition(dF,1)

      X1 X2 q
     1 1 1 HP
     2 1 1 HP
     3 1 1 HP

Creating a distributed list

Create a distributed list by specifying the number of partitions.

     > dL <- dlist(partitions=3)

     Initially, the list is empty.

     > getpartition(dL)
     list()

The list can be populated using the foreach loop.

     > foreach(i, 1:npartitions(dL), function(dl=splits(dL,i),      idx=i){
     + dl<-list(c("HP", idx))
     + update(dl)
     + })
     [1] TRUE

Individual partitions or the whole list can be obtained by:

     > getpartition(dL,1)
     [[1]]
     [1] “HP” “1”
     > getpartition(dL)
     [[1]]
     [1] “HP” “1”
     [[2]]
     [1] “HP” “2”
     [[3]]
     [1] “HP” “3”

Load and save data from files

You can save or load data in parallel. Distributed R can run on top of databases, such as HP
Vertica, and even file systems. You can load or save data to different stores as long as the right connector exists. This example demonstrates saving data to files. Use the foreach loop to write each partition of an array to a file:

     > fname <- paste(getwd(),"/Data",sep="")
     > foreach(i, 1:npartitions(D),
     + saves<-function(d = splits(D,i), index=i, name=fname){
     + write(d, paste(name,index,sep=""))
     + })

     [1] TRUE

The example above writes each partition in a different file. If Distributed R is running on a single computer, all the files are present on the same computer. You can load one of the partitions and check its contents:

     > scan(paste(fname,5,sep=””))
     [1] 15 15 15 15 15 15 15 15 15

Note that the above command may not work if Distributed R is running on a cluster, as the file may be in a remote computer. Instead, use a foreach loop to load data in parallel into a distributed array. Declare a new distributed array E, of the same size as D, and load data from previously saved files. Since scan returns the values as a single vector, convert the data into a matrix of the correct size before calling update:

     > E <- darray(dim=c(9,9), blocks=c(3,3))
     > foreach(i, 1:npartitions(E),
     + loads<-function(e = splits(E,i), index=i, name=fname){
     + fn <- paste(name,index,sep="")
     + e <- matrix(scan(file=fn), nrow=nrow(e))
     + update(e)
     + })

     [1] TRUE

     > getpartition(E,5)

      [,1] [,2] [,3]
     [1,] 15 15 15
     [2,] 15 15 15
     [3,] 15 15 15

Load and save data from HP Vertica

To load data from a database into a distributed array, use an ODBC connector such as HP Vertica RODBC (vRODBC) or RODBC. The following example shows how to load data using a foreach loop that makes concurrent ODBC connections to an HP Vertica database. Declare a 50×4 array in which each partition contains five rows. Within the loop, load each partition by querying the database for five rows at a time. Note that for this example to work, vRODBC must be installed and configured to connect to an HP Vertica database. To use this example with RODBC, replace the occurrences of vRODBC with RODBC.

     > X <- darray(dim=c(50, 4), blocks=c(5, 4), sparse=FALSE)
     > foreach(i, 1:npartitions(X), initArrays <- function(x = splits(X,i), index=i) {
     + library(vRODBC)
     + connect<-odbcConnect("Test")
     + size <- nrow(x)
     + start <- (index-1) * size
     + end <- index * size
     + qry <- paste("select A,B,C,D from T where id >=”, start,”and id <", end, "order by id")
     + segment<-sqlQuery(connect, qry)
     + odbcClose(connect)
     + x<-cbind(segment$A, segment$B, segment$C, segment$D)
     + update(x)
     + })

progress: 100%
[1] TRUE

In this example, the programmer loads the ODBC package inside the loop using library(vRODBC). This step is necessary because the function inside the foreach loop executes on the worker. Packages must be explicitly loaded in the worker environment.
This example uses HP Vertica’s internal row identifiers to select rows. For example, the first five rows in the Vertica table T are assigned to the first partition of array X. HP Vertica has row identifiers to refer to individual rows.
To fetch the first partition and display data, use getpartition:

     > getpartition(X, 1)
         [,1]  [,2]   [,3]    [,4]
      [1,] 5 0.903815 0.522466 0.250464
      [2,] 1 0.994233 0.138644 0.139464
      [3,] 3 0.117651 0.285975 0.309341
      [4,] 4 0.280725 0.006694 0.684827
      [5,] 6 0.331704 0.835160 0.498040

Parallel execution using existing packages

Sometimes you can achieve a goal by applying functions from existing packages in parallel. Take the example of finding the shortest distance from five source vertices to all other vertices in a graph. The distance calculation from each source vertex is independent from others, enabling you to start five parallel tasks to calculate them. R already has a package called igraph that can calculate shortest distances. The example below details how to reuse igraph to solve the above problem. In this example, igraph must be installed on all nodes in your Distributed R cluster. You can manually download the software from CRAN or use install.packages(igraph).

First, create a sparse distributed array to store the graph. The array has only one partition equal to the total size of the graph.

     > G<-darray(dim=c(100,100), blocks=c(100,100), sparse=TRUE)

Use a foreach loop to generate a random graph and store it in the array. You must load the igraph library inside the loop function.

     > foreach(i, 1:1, initGraph<-function(g=splits(G)){
     + library(igraph)
     + rg<-erdos.renyi.game(nrow(g),0.1)
     + g<-get.adjacency(rg, sparse=TRUE)
     + update(g)
     + })

     [1] TRUE

Run parallel tasks to calculate shortest distances and store them in another array called paths. Partition the array paths to give each partition one row and 100 columns. In this design, each element in the array corresponds to the distance of the source vertex for a given destination vertex.

     > paths<-darray(dim=c(5,100), blocks=c(1,100), sparse=FALSE)
     > foreach(i, 1:npartitions(paths),
     + calc<-function(g=splits(G), p=splits(paths,i), vertex=i){
     + library(igraph)
     + p<-shortest.paths(graph.adjacency(g), vertex)
     + update(p)
     + })
     [1] TRUE

Fetch all shortest distances from the first vertex and print the first ten values:

     > getpartition(paths, 1)[,1:10]
     [1] 0 2 2 2 2 2 2 2 2 2
     > distributedR_shutdown()
     [1] TRUE