parLapply in real life

Parallel Programming in R

Nabeel Imam

Data Scientist

Let's meet the workers

cluster <- makeCluster(4)
clusterEvalQ(cluster, {
  id <- Sys.getpid()
  print(
    paste("Hello, my worker ID is", id)
  )
})
[[1]]
[1] "Hello, my worker ID is 425108"

[[2]]
[1] "Hello, my worker ID is 425129"

[[3]]
[1] "Hello, my worker ID is 425150"

[[4]]
[1] "Hello, my worker ID is 425171"
Parallel Programming in R

Filtering data in parallel

print(file_list)
 [1] "./health/Afghanistan.csv"            
 [2] "./health/Albania.csv"            
 [3] "./health/Algeria.csv"            
 [4] "./health/American Samoa.csv"     
 [5] "./health/Andorra.csv"            
...

A stethoscope rests on a wad of hundred dollar bills.

Parallel Programming in R

Filtering data in parallel

filterCSV <- function (csv) {
  read.csv(csv) %>% 
    dplyr::filter(!is.na(health_exp_pc))
}


cl <- makeCluster(4) ls_df <- parLapply(cl, file_list, filterCSV) stopCluster(cl)
Error in checkForRemoteErrors(val) :
  first error: could not find function "%>%"
Parallel Programming in R

clusterEvalQ to the rescue

Load a package on the cluster

cl <- makeCluster(4)
clusterEvalQ(cl, library(dplyr))


ls_df <- parLapply(cl, file_list, filterCSV) stopCluster(cl)

Load multiple packages on the cluster

clusterEvalQ(cl, {
  library(dplyr)
  library(stringr)
})
[[1]]
       Country health_exp_pc Year
1  Afghanistan      81.27103 2002
2  Afghanistan      82.45785 2003
3  Afghanistan      89.47005 2004
...

[[2]]
   Country health_exp_pc Year
1  Albania      300.2757 2001
2  Albania      314.3254 2002
3  Albania      343.9442 2003
...
Parallel Programming in R

Filtering with conditions

# Function with an argument for starting year
filterCSV <- function (csv, min_year) { 

  read.csv(csv) %>% 
    dplyr::filter(!is.na(health_exp_pc),

                  # Filter data for min_year and onwards
                  Year >= min_year) 
}


selected_year <- 2010 # Value to be supplied to min_year
Parallel Programming in R

Filtering with conditions

cl <- makeCluster(4)
clusterEvalQ(cl, library(dplyr))

clusterExport(cl, "selected_year",
envir = environment())
ls_df <- parLapply(cl, file_list, filterCSV,
min_year = selected_year)
stopCluster(cl)

   

  • Export selected_year to cluster
  • Export from current environment

 

  • Supply selected_year to min_year
Parallel Programming in R

Filtering with conditions

[[1]]
       Country health_exp_pc Year
1  Afghanistan      143.6695 2010
2  Afghanistan      143.0915 2011
3  Afghanistan      151.9180 2012
...
  Country health_exp_pc Year
1 Albania      451.8820 2010
2 Albania      485.5835 2011
3 Albania      529.6322 2012
...
Parallel Programming in R

Cluster hygiene checklist

  • Determine the number of cores
  • Create appropriate cluster
    • PSOCK for compatibility on all systems
    • FORK for Linux or Mac (and speed!)
  • Load any libraries needed
  • Export any variables needed
  • Supply exported variable to the named argument
  • Stop the cluster once done
n_cores <- detectCores() - 2


cluster <- makeCluster(n_cores)
clusterEvalQ(cluster, library(crucial_package))
clusterExport(cluster, "variable_we_need")
parLapply(cluster, ls_inputs, our_function, named_argument = variable_we_need)
stopCluster(cluster)
Parallel Programming in R

Let's practice!

Parallel Programming in R

Preparing Video For Download...