Efficient loops in TERR via parallel processing

Hi,

I've started looking into multi-threading in order to make some for loops faster which get really slow when running with more data. The purpose of this thread shall be two-fold:

  1. I'd like to post what I did in order to convert my code from an ordinary for loop into a multi-core code such that other beginners can use this and experts can give hints on what's still to be improved.
  2. I'd like to get a discussion going on some principle issues arising from using (my) mulit-core code in TERR on a windows machine and see if there are workarounds.

So let's start with #1. I have a simple code by which you can probably tell that I'm by no means an R expert. I will simplify it to just show the esential bits but basically it is a for loop which takes a data set where I've got about 100 X, Y, Z values per job for many jobs and does a surface fit of Z on the X/Y coordinates returning 23,000 rows of X, Y, Zfit. So memory load increases substantially whilst running the code. This is how the original for loop code looks like:

 

# There is a matrix with one row per job and Param1 is the job number
# whilst Col2Fit is the Z value

# Define standard grid for fit results
x1 <- seq(-76000,76000,1000)
no <- round((152000/1000),0)+1
xy1 <- cbind(rep(x1,no),rep(x1,each=no))

library(MBA) # load required package

# create a subset of the complete data set dat (one job only) and do a surface fit job by job
out[1,3]<-memory.size() # record memory load before starting the loop
out[1,1] <- as.character.Date(Sys.time()) # record time before starting the loop
for(i in 1:dim(MatrixMBA)[1]){
  datin <- dat[dat[,Param1]==MatrixMBA[i,Param1], ]
  datin2 <- cbind.data.frame(datin[,X],datin[,Y],datin[,Col2Fit],stringsAsFactors=FALSE)
  colnames(datin2) <- c(Param1,Param2,Col2Fit)
  datin2 <- subset(datin2,!is.na(datin2[3]))
  dat.Mba <- mba.points(datin2,xy1,h=h,extend=FALSE,verbose=FALSE)$xyz.est
  # here I'm doing some lengthy stuff to add the job number and other things back
  # in as additional columns but that's irrelavant for this topic
  MbaStdGrid <- rbind(Mba_tmp, MbaStdGrid)
}
out[1,4]<-memory.size() # record memory load after computation is done
out[1,2] <- as.character.Date(Sys.time()) # record time after computation is done
gc() # clean up the memory
out[1,7] <- memory.size()  # record memory load after clean up
out[1,6] <- 0 # record "number of cores" for comparison to multi core approach beliw

In order to use multiple cores/clusters I went for the parallel package since it seems to be recommended for TERR and e.g. doparallel does not seem to be compatible with TERR. Anyways, here is what I came up with and this I believe is what's being called distributed memory parallelism, i.e. you have to create a copy of your data for each cluster:

library(parallel) # load required package

# definition of a function which computes the MBA fit
CDMBA <- function(i){
   datin <- dat[dat[,Param1]==MatrixMBA[i,Param1],]
   datin2 <- cbind.data.frame(datin[,X],datin[,Y],datin[,Col2Fit],stringsAsFactors=FALSE)
   colnames(datin2) <- c(Param1,Param2,Col2Fit)
   datin2 <- subset(datin2,!is.na(datin2[3]))
   dat.Mba <- mba.points(datin2,xy1,h=h,extend=FALSE,verbose=FALSE)$xyz.est
   # again some code here to add the meta data as additional columns
   MbaStdGrid <- rbind(Mba_tmp, MbaStdGrid)
   colnames(MbaStdGrid) <- c(Param1,Param2,Param3,Param4,Param5,Param6,'X','Y','MBA_clear_norm')
   return(MbaStdGrid)
}

# create i clusters and export all data to them
out[1,3]<-memory.size() # record memory load before creating clusters
out[1,1] <- as.character.Date(Sys.time()) # record time before creating cluster
out[1,6] <- i # record number of cores
cl <- makeCluster(i) # set-up i clusters
clusterExport(cl, ls()) # export data to all clusters, i.e. duplicate it i times!
clusterEvalQ(cl, {library(MBA)}) # load library for all cores

# run the function on mulitple cores and stop them even in case of errors
tryCatch(result <- clusterApply(cl, 1:dim(MatrixMBA)[1], CDMBA), finally=stopCluster(cl))
MbaStdGrid <- do.call('rbind', result)
out[1,4]<-memory.size() # record memory load after computation is done
out[1,2] <- as.character.Date(Sys.time()) # record time after computation is done
gc() # clean up the memory
out[1,7] <- memory.size() # record memory load after clean up

Soon I realized that duplicating all the data for all cores can quickly get you into a situation where the CPUs start idling because your memory is insufficient. So I ran a couple of tests with both codes using TERR 4.3.0 within RStudio with varying number of jobs and cores (in case of the multi core code). I will upload two screen shots showing the run time and the memory load start, end and after gc() as a function of # of cores (where 0 means I used the original for loop) and # of jobs/MBAs which were computed. Note: I think memory.size() probably returns the memory load per cluster and not the total load (at least in the run with 64 jobs and 16 cores I saw > 6GB memory load in the Windows task manager where memory.size() returned about 380MB and 380*16/1024=5.9...)

A few observations where #2 and #3 caught me by suprprise are:

  1. For small data volumes / # of jobs (1 and 8) using multi core processing is of no benefit
  2. For a bit bigger data volumes (I don't consider 64 jobs a large data set) quite quickly a further increase of cores is of no use (BTW: 16 is the number of cores detected by detectCores() on my machine). This obvisouly has got to do with the high memory load.

    Again: I assume the output from memory.size() is per cluster so that e.g. the 380MB I see for the 64 jobs data set after computation really means 6GB with 16 cores which is too much for my 8GB machine.
  3. The remaining memory load after clean up using gc() is significantly higher using the mulit core approach than with the original for loop which is why I'm nervous using this at all.

So overall the improvement from my approach to multi core processing instead of for loops has a very small range of data volumes where it is helpful: if it is too small the simple for loop is faster than setting up all the cores but if it is too large the memory load explodes due to the distributed memory approach and slows things down dramatically. I understand forking would use a shared memory not creating these kind of issues but that is something which is not support by TERR and apparently is not possible when running R on a Windows machine (see page 2). Another option could be a shared memory approach e.g. using mclapply but again TERR let's you down and does not support mclapply (it's available but unlike open source R it does just the same as lapply and is no multi core version of it).

I've got the vague feeling this can't be it. TIBCO claims TERR would be faster than open source R due to it's better memory managment so the conclusion can't be that I need shared memory either via forking or via mclapply and both is notz supported by TERR. I'm sure I'm missing something... any hints how to make this more memory efficient would be highly appreciated.

Cheers,

Mark

Attachments

AttachmentSize
Image icon runtime.jpg163.1 KB
Image icon memory.jpg183.05 KB

(3) Answers

Login