最新版本的R已经内置parallel包,parallel包是从snow包和multicore包继承而来,包含了很多非常好用的函数。parallel包可以通过PVM(rpvm包)、MPI(Rmpi包)、NetWorkSpaces(nws包)和raw sockets(如果以上3种都不能使用)平台进行分布计算,支持cluster和多核个人/服务器计算机。在Linux系统上,通常使用openMPI。
因为使用openMPI,所以parallel包需要Rmpi包来设定节点,所以首先需要在计算机上安装openMPI。
# 安装openmpi环境
# yum install openmpi openmpi-devel
# 配置环境(安装时执行,可能之后运行也要执行)
# ldconfig /usr/lib64/openmpi/lib/
在~/.bashrc
下写入
export LD_LIBRARY_PATH="${LD_LIBRARY_PATH}${LD_LIBRARY_PATH:+:}/usr/lib64/openmpi/lib/"
载入~/.bashrc
$ source ~/.bashrc
在启动的R窗口中输入:
install.packages("Rmpi",
configure.args =
c("--with-Rmpi-include=/usr/include/openmpi-x86_64/",
"--with-Rmpi-libpath=/usr/lib64/openmpi/lib/",
"--with-Rmpi-type=OPENMPI"))
首先,需要设定cluster的节点(nodes)数目
cl <- makeCluster(2, type = "MPI")
这里对“节点数”设定做一些解释,如果使用cluster,可以直接设定cluster数据即可;如果是在小型服务器或者个人电脑上使用,最大节点数可以设定为“线程数(processor)-1”。比如一个双核四线程计算机,节点数目最大可以设定为3。这是因为snow包(parallel包的主要依赖包)在设计时,总是要保留一个**“主线程”**来处理和整合数据。
在linux系统下,线程数可以通过 $ nproc
查看。
使用parallel包中的内置并行运算函数
比如使用parApply()
、parCapply()
、parRapply()
、parLapply()
和parSapply()
(如果返回矩阵,使用
cbind()
)等函数。其中文档中指出,parApply()
函数对于二维矩阵的每一个单元进行操作,因此要慢一些。如果可能,使用parCapply()
和parRapply()
对列和行进行操作,以加快运行速度。
stopCluster(cl)
在并行计算过程中,不可避免地会用到其他包辅助。这里涉及到snow包的一个设计原理:并行运算多个R进程,只有一个主进程载入完整的依赖包环境。这就意味着其他并行的R进程中也要载入依赖的包环境。
有两个思路,第一个思路是修改Rprofile.site
文件,让任意R进程在启动时都载入依赖的包。但是,不推荐这种做法,因为这样会增加R载入的速度;并且如果不同的代码用了不同的依赖包,就要不停地修改Rprofile.site
文件。
第二个思路是在新开的R进程中“动态”加载需要的包。所谓**“动态”**,没有什么高深的意思,就是“需要的时候加载即可”。根据需要,可以选择以下两种方法。
这种方法非常直观,推荐。
# 以下代码摘抄自Parallel R,其中packages
# 是一个要选择加载的package列表,
# 比如c('bigmemory', 'foreach', 'doMC')
worker.init <- function(packages) {
for (p in packages) {
library(p, character.only=TRUE)
}
NULL
}
clusterCall(cl, worker.init, c('bigmemory', 'foreach', 'doMC'))
这种方法不推荐,因为我们将看到这种方法是“投机”了parallel包的并行apply
家族函数。原理是:parallel包中最主要的就是apply
家族函数,比如parApply(cl = NULL, X, MARGIN, FUN, ...)
函数,是base包中apply()
的并行版本。其中会用到一个FUN
函数,我们可以在这个函数中加载包,比如写入require('bigmemory')
等。这样,并行的R进程就会载入需要的包。举例如下:
Getft <- function(i, arg1, arg2){
require(packages)
...
}
adft <- parSapply(cl, 1:10, Getft, argInput1, argInput2)
parallel包可以很好地与bigmemory包结合,进而进一步提升R操作大数据的能力。
但是,有一个问题是parApply()
、parCapply()
、parRapply()
函数是不能直接调用bigmemory包的big.memory
这种S4对象。当然也可以使用mat[, ]
之类语句引用big.matrix对象。但是这会把矩阵全部载入内存,也就失去了big.matrix
对象的意义,只有在内存允许的情况下这样操作。
解决办法:
将big.matrix
对象的操作放在一个函数中,函数传入的是big.matrix
的description file
(描述文件),而不是big.matrix
对象本身。
把这个函数作为parLapply()
和parSapply()
的FUN
,达到分布计算,而又不直接引用big.matrix
对象的目的。
这个思路的前提是:创建的big.matrix
对象必须是“内存共享”的,否则不能将其分布到不同的节点计算。
举一个例子,完整版本见补充材料:Final version,这个例子中首先创建一个Getft()
函数,接受adAllRowColDesc
和adMatDesc
两个变量是big.matrix
对象的描述文件。在这个函数中,attach.big.matrix()
通过描述文件引用big.matrix
对象,并完成相关操作。
Getft <- function(i, adAllRowColDesc, adMatDesc){
adAllRowColData <- attach.big.matrix(adAllRowColDesc)
adMatData <- attach.big.matrix(adMatDesc)
rowIndex <- adAllRowColData[i, 1]
colIndex <- adAllRowColData[i, 2]
linkData <- c(rowNames[rowIndex], rowNames[colIndex], adMatData[rowIndex, colIndex])
return(linkData)
}
之后,使用parSapply()
函数调用Getft()
函数,使用1:nrow(adAllRowCol)
作为“计数器”。
adft <- parSapply(cl, 1:nrow(adAllRowCol), Getft, adAllRowColDescFile, adMatDescFile)
如果需要处理的big.matrix
对象不大,也可以直接使用parSapply()
函数,详细参考补充材料:Bigmatrix direct version。
另外一个支持并行计算的包是foreach包,它天生与big.matrix
对象匹配。所以,我也提供了使用foreach
版本,详细参考补充材料:Foreach version。
通过测试可以发现,在数据量较少时(1000行左右),foreach
版本和parSapply()
版本速度基本持平。但是,数据量增大时(百万行),foreach
版本速度明显减慢。原因是在使用foreach
并行计算时,计算开始时候需要建立索引。这个过程在循环数变大时,会变得非常缓慢。
因此,我们可以看到,如果使用foreach包,会减少代码量,而且程序逻辑也非常清晰,但是遇到超大循环数,速度明显减慢。同时,如果使用parallel包,那么需要一些“技巧”才能与big.matrix
对象有效融合。所以,我们的结论是原生态的R(包括提供的一些包)不适合做并行大数据计算。
State of the Art in Parallel Computing with R,这篇文章详细介绍了很多R并行计算的平台
adj2ftBig <- function(adMat, adAllRowCol, n = 2){
# INPUT: 'adMat' should be a bigmatrix. 'adAllRowCol' is the row and column combination, also is a bigmatrix
require(bigmemory)
require(parallel)
cl <- makeCluster(n, type = "MPI")
adMatDescFile <- describe(adMat)
adAllRowColDescFile <- describe(adAllRowCol)
rowNames <- rownames(adMat)
colNames <- colnames(adMat)
ignore <- clusterEvalQ(cl, {library(bigmemory); NULL})
Getft <- function(i, adAllRowColDesc, adMatDesc){
adAllRowColData <- attach.big.matrix(adAllRowColDesc)
adMatData <- attach.big.matrix(adMatDesc)
rowIndex <- adAllRowColData[i, 1]
colIndex <- adAllRowColData[i, 2]
linkData <- c(rowNames[rowIndex], rowNames[colIndex], adMatData[rowIndex, colIndex])
return(linkData)
}
adft <- parSapply(cl, 1:nrow(adAllRowCol), Getft, adAllRowColDescFile, adMatDescFile)
stopCluster(cl)
return(adft)
}
adj2ftBig3 <- function(adMat, adAllRowCol, n = 2){
# INPUT: 'adMat' is a matrix. 'adAllRowCol' is the row and column combination, also a matrix.
library(parallel)
cl <- makeCluster(n, type = "MPI")
rowNames <- rownames(adMat)
colNames <- colnames(adMat)
adft <- parRapply(cl = cl, adAllRowCol, function(x) {
linkData <- c(rowNames[x[1]], colNames[x[2]], adMat[x[1], x[2]])
return(linkData)
})
stopCluster(cl)
return(adft)
}
adj2ftBig2 <- function(adMat, adAllRowCol, n = 4){
# INPUT: 'adMat' should be a bigmatrix. 'adAllRowCol' is the row and column combination, also a bigmatrix.
library(bigmemory)
library(foreach)
library(doMC)
registerDoMC(n)
rowNames <- rownames(adMat)
colNames <- colnames(adMat)
adft <- foreach (i = 1:nrow(adAllRowCol), .combine = rbind, .inorder=TRUE) %dopar% {
print(paste('It is running ', i, ' in total of ', nrow(adAllRowCol), '.', sep = ''))
linkData <- c(rowNames[adAllRowCol[i, 1]], colNames[adAllRowCol[i, 2]], adMat[adAllRowCol[i, 1], adAllRowCol[i, 2]])
return(linkData)
}
return(adft)
}
2015年12月30日