Speed up your R Work

By John Mount

Present 1

(This article was first published on R – Win-Vector Blog, and kindly contributed to R-bloggers)

Introduction

In this note we will show how to speed up work in R by partitioning data and process-level parallelization. We will show the technique with three different R packages: rqdatatable, data.table, and dplyr. The methods shown will also work with base-R and other packages.

For each of the above packages we speed up work by using wrapr::execute_parallel which in turn uses wrapr::partition_tables to partition un-related data.frame rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel conveniently bundles all of these steps together when working with rquery pipelines.

The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.

Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.

Our example

First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).

library("rqdatatable")
## Loading required package: rquery
library("microbenchmark")
library("ggplot2")
library("WVPlots")
suppressPackageStartupMessages(library("dplyr"))
## Warning: package 'dplyr' was built under R version 3.5.1
base::date()
## [1] "Sun Jul  8 09:05:25 2018"
R.version.string
## [1] "R version 3.5.0 (2018-04-23)"
parallel::detectCores()
## [1] 4
packageVersion("parallel")
## [1] '3.5.0'
packageVersion("rqdatatable")
## [1] '0.1.2'
packageVersion("rquery")
## [1] '0.5.1'
packageVersion("dplyr")
## [1] '0.7.6'
ncore  parallel::detectCores()
print(ncore)
## [1] 4
cl  parallel::makeCluster(ncore)
print(cl)
## socket cluster with 4 nodes on host 'localhost'
set.seed(2362)
mk_example  function(nkey, nrep, ngroup = 20) {
  keys  paste0("key_", seq_len(nkey))
  key_group  sample(as.character(seq_len(ngroup)), 
                      length(keys), replace = TRUE)
  names(key_group)  keys
  key_table  data.frame(
    key = rep(keys, nrep),
    stringsAsFactors = FALSE)
  key_table$data  runif(nrow(key_table))
  instance_table  data.frame(
    key = rep(keys, nrep),
    stringsAsFactors = FALSE)
  instance_table$id  seq_len(nrow(instance_table))
  instance_table$info  runif(nrow(instance_table))
  # groups should be no finer than keys
  key_table$key_group  key_group[key_table$key]
  instance_table$key_group  key_group[instance_table$key]
  list(key_table = key_table,
       instance_table = instance_table)
}

dlist  mk_example(10, 10)
data  dlist$instance_table
annotation  dlist$key_table

rquery / rqdatatable

rquery and rqdatatable can implement a non-trivial calculation as follows.

# possible data lookup: find rows that
# have lookup data 
optree  local_td(data) %.>%
  natural_join(., 
               local_td(annotation), 
               jointype = "INNER", 
               by = "key") %.>%
  select_rows_nse(., data  info) %.>%
  pick_top_k(., 
             k = 1,
             partitionby = "id",
             orderby = "data",
             reverse = "data",
             keep_order_column = FALSE) %.>%
  orderby(., "id")
cat(format(optree))
## table('data'; 
##   key,
##   id,
##   info,
##   key_group) %.>%
##  natural_join(.,
##   table('annotation'; 
##     key,
##     data,
##     key_group),
##   j= INNER, by= key) %.>%
##  select_rows(.,
##    data %
##  extend(.,
##   row_number := row_number(),
##   p= id,
##   o= "data" DESC) %.>%
##  select_rows(.,
##    row_number %
##  drop_columns(.,
##    row_number) %.>%
##  orderby(., id)
res1  ex_data_table(optree)
head(res1)
##         data id      info   key key_group
## 1: 0.9152014  1 0.9860654 key_1        20
## 2: 0.5599810  2 0.5857570 key_2         8
## 3: 0.3011882  3 0.3334490 key_3        10
## 4: 0.3650987  4 0.3960980 key_4         5
## 5: 0.1469254  5 0.1753649 key_5        14
## 6: 0.2567631  6 0.3510280 key_6         7
nrow(res1)
## [1] 94

And we can execute the operations in parallel.

parallel::clusterEvalQ(cl, 
                       library("rqdatatable"))
## [[1]]
## [1] "rqdatatable" "rquery"      "stats"       "graphics"    "grDevices"  
## [6] "utils"       "datasets"    "methods"     "base"       
## 
## [[2]]
## [1] "rqdatatable" "rquery"      "stats"       "graphics"    "grDevices"  
## [6] "utils"       "datasets"    "methods"     "base"       
## 
## [[3]]
## [1] "rqdatatable" "rquery"      "stats"       "graphics"    "grDevices"  
## [6] "utils"       "datasets"    "methods"     "base"       
## 
## [[4]]
## [1] "rqdatatable" "rquery"      "stats"       "graphics"    "grDevices"  
## [6] "utils"       "datasets"    "methods"     "base"
res2  ex_data_table_parallel(optree, 
                               "key_group", 
                               cl)
head(res2)
##         data id      info   key key_group
## 1: 0.9152014  1 0.9860654 key_1        20
## 2: 0.5599810  2 0.5857570 key_2         8
## 3: 0.3011882  3 0.3334490 key_3        10
## 4: 0.3650987  4 0.3960980 key_4         5
## 5: 0.1469254  5 0.1753649 key_5        14
## 6: 0.2567631  6 0.3510280 key_6         7
nrow(res2)
## [1] 94

data.table

data.table can implement the same function.

library("data.table")
## 
## Attaching package: 'data.table'

## The following objects are masked from 'package:dplyr':
## 
##     between, first, last
packageVersion("data.table")
## [1] '1.11.4'
data_table_f  function(data, annotation) {
  data  data.table::as.data.table(data)
  annotation  data.table::as.data.table(annotation)
  joined  merge(data, annotation, 
                  by = "key", 
                  all=FALSE, 
                  allow.cartesian=TRUE)
  joined  joined[joined$data  joined$info, ]
  data.table::setorderv(joined, cols = "data")
  joined  joined[, .SD[.N], id]
  data.table::setorderv(joined, cols = "id")
}
resdt  data_table_f(data, annotation)
head(resdt)
##    id   key      info key_group.x      data key_group.y
## 1:  1 key_1 0.9860654          20 0.9152014          20
## 2:  2 key_2 0.5857570           8 0.5599810           8
## 3:  3 key_3 0.3334490          10 0.3011882          10
## 4:  4 key_4 0.3960980           5 0.3650987           5
## 5:  5 key_5 0.1753649          14 0.1469254          14
## 6:  6 key_6 0.3510280           7 0.2567631           7
nrow(resdt)
## [1] 94

We can also run data.table in parallel using wrapr::execute_parallel.

parallel::clusterEvalQ(cl, library("data.table"))
## [[1]]
##  [1] "data.table"  "rqdatatable" "rquery"      "stats"       "graphics"   
##  [6] "grDevices"   "utils"       "datasets"    "methods"     "base"       
## 
## [[2]]
##  [1] "data.table"  "rqdatatable" "rquery"      "stats"       "graphics"   
##  [6] "grDevices"   "utils"       "datasets"    "methods"     "base"       
## 
## [[3]]
##  [1] "data.table"  "rqdatatable" "rquery"      "stats"       "graphics"   
##  [6] "grDevices"   "utils"       "datasets"    "methods"     "base"       
## 
## [[4]]
##  [1] "data.table"  "rqdatatable" "rquery"      "stats"       "graphics"   
##  [6] "grDevices"   "utils"       "datasets"    "methods"     "base"
parallel::clusterExport(cl, "data_table_f")

dt_f  function(tables_list) {
  data  tables_list$data
  annotation  tables_list$annotation
  data_table_f(data, annotation)
}

data_table_parallel_f  function(data, annotation) {
  respdt  wrapr::execute_parallel(
    tables = list(data = data, 
                  annotation = annotation),
    f = dt_f,
    partition_column = "key_group",
    cl = cl) %.>%
    data.table::rbindlist(.)
  data.table::setorderv(respdt, cols = "id")
  respdt
}
respdt  data_table_parallel_f(data, annotation)
head(respdt)
##    id   key      info key_group.x      data key_group.y
## 1:  1 key_1 0.9860654          20 0.9152014          20
## 2:  2 key_2 0.5857570           8 0.5599810           8
## 3:  3 key_3 0.3334490          10 0.3011882          10
## 4:  4 key_4 0.3960980           5 0.3650987           5
## 5:  5 key_5 0.1753649          14 0.1469254          14
## 6:  6 key_6 0.3510280           7 0.2567631           7
nrow(respdt)
## [1] 94

dplyr

dplyr can also implement the example.

dplyr_pipeline  function(data, annotation) {
  res  data %>%
    inner_join(annotation, by = "key") %>%
    filter(data  info) %>%
    group_by(id) %>%
    arrange(-data) %>%
    mutate(rownum = row_number()) %>%
    ungroup() %>%
    filter(rownum == 1) %>%
    arrange(id)
  res
}

resd  dplyr_pipeline(data, annotation)
head(resd)
## # A tibble: 6 x 7
##   key      id  info key_group.x  data key_group.y rownum
##   
## 1 key_1     1 0.986 20          0.915 20               1
## 2 key_2     2 0.586 8           0.560 8                1
## 3 key_3     3 0.333 10          0.301 10               1
## 4 key_4     4 0.396 5           0.365 5                1
## 5 key_5     5 0.175 14          0.147 14               1
## 6 key_6     6 0.351 7           0.257 7                1
nrow(resd)
## [1] 94

And we can use wrapr::execute_parallel to parallelize the dplyr solution.

parallel::clusterEvalQ(cl, library("dplyr"))
## [[1]]
##  [1] "dplyr"       "data.table"  "rqdatatable" "rquery"      "stats"      
##  [6] "graphics"    "grDevices"   "utils"       "datasets"    "methods"    
## [11] "base"       
## 
## [[2]]
##  [1] "dplyr"       "data.table"  "rqdatatable" "rquery"      "stats"      
##  [6] "graphics"    "grDevices"   "utils"       "datasets"    "methods"    
## [11] "base"       
## 
## [[3]]
##  [1] "dplyr"       "data.table"  "rqdatatable" "rquery"      "stats"      
##  [6] "graphics"    "grDevices"   "utils"       "datasets"    "methods"    
## [11] "base"       
## 
## [[4]]
##  [1] "dplyr"       "data.table"  "rqdatatable" "rquery"      "stats"      
##  [6] "graphics"    "grDevices"   "utils"       "datasets"    "methods"    
## [11] "base"
parallel::clusterExport(cl, "dplyr_pipeline")

dplyr_f  function(tables_list) {
  data  tables_list$data
  annotation  tables_list$annotation
  dplyr_pipeline(data, annotation)
}

dplyr_parallel_f  function(data, annotation) {
  respdt  wrapr::execute_parallel(
    tables = list(data = data, 
                  annotation = annotation),
    f = dplyr_f,
    partition_column = "key_group",
    cl = cl) %>%
    dplyr::bind_rows() %>%
    arrange(id)
}
respdplyr  dplyr_parallel_f(data, annotation)
head(respdplyr)
## # A tibble: 6 x 7
##   key      id  info key_group.x  data key_group.y rownum
##   
## 1 key_1     1 0.986 20          0.915 20               1
## 2 key_2     2 0.586 8           0.560 8                1
## 3 key_3     3 0.333 10          0.301 10               1
## 4 key_4     4 0.396 5           0.365 5                1
## 5 key_5     5 0.175 14          0.147 14               1
## 6 key_6     6 0.351 7           0.257 7                1
nrow(respdplyr)
## [1] 94

Benchmark

We can benchmark the various realizations.

dlist  mk_example(300, 300)
data  dlist$instance_table
annotation  dlist$key_table

timings  microbenchmark(
  data_table_parallel = 
    nrow(data_table_parallel_f(data, annotation)),
  data_table = nrow(data_table_f(data, annotation)),
  rqdatatable_parallel = 
    nrow(ex_data_table_parallel(optree, "key_group", cl)),
  rqdatatable = nrow(ex_data_table(optree)),
  dplyr_parallel = 
    nrow(dplyr_parallel_f(data, annotation)),
  dplyr = nrow(dplyr_pipeline(data, annotation)),
  times = 10L)

saveRDS(timings, "Parallel_rqdatatable_timings.RDS")

Conclusion

print(timings)
## Unit: seconds
##                  expr       min        lq      mean    median        uq
##   data_table_parallel  5.274560  5.457105  5.609827  5.546554  5.686829
##            data_table  9.401677  9.496280  9.701807  9.541218  9.748159
##  rqdatatable_parallel  7.165216  7.497561  7.587663  7.563883  7.761987
##           rqdatatable 12.490469 12.700474 13.320480 12.898154 14.229233
##        dplyr_parallel  6.492262  6.572062  6.784865  6.787277  6.875076
##                 dplyr 20.056555 20.450064 20.647073 20.564529 20.800350
##        max neval
##   6.265888    10
##  10.419316    10
##   7.949404    10
##  14.282269    10
##   7.328223    10
##  21.332103    10
# autoplot(timings)

timings  as.data.frame(timings)
timings$seconds  timings$time/1e+9

ScatterBoxPlotH(timings, 
                xvar = "seconds", yvar = "expr", 
                title="task duration by method")

The benchmark timings show parallelized data.table is the fastest, followed by parallelized dplyr, and parallelized rqdatatable. In the non-paraellized case data.table is the fastest, followed by rqdatatable, and then dplyr.

A reason dplyr sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table) is that data.table starts already multi-threaded, so data.table is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).

rquery pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable supplies a very good in-memory implementation of the rquery system based on data.table. rquery also speeds up solution development by supplying higher order operators and early debugging features.

In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable, data.table, or dplyr.

Note: we did not include alternatives such as multidplyr or dtplyr in the timings, as they did not appear to work on this example.

Materials

The original rendering of this article can be found here, source code here, and raw timings here.

To leave a comment for the author, please follow the link and comment on their blog: R – Win-Vector Blog.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more…

Source:: R News

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.