Tutorial: R functions edited for parallel processing
14
gravatar for Kevin Blighe
14 months ago by
Kevin Blighe32k
Republic of Ireland
Kevin Blighe32k wrote:

LAST TESTED: OCTOBER 2nd 2018 / في الثالث من سبتمبر 2018

-------------------------------------------

As R is becoming ever more used in bioinformatics, the need for parallel processing is great due to the sheer amounts of data that is being produced. I thought that it would be very useful to have a thread where people can put their R functions that they have enabled for parallel processing.

load doParallel and associated packages

require(doParallel)

set cores / threads

cores <- 12

Note: you can automatically detect the number of CPU cores with cores <- makeCluster(detectCores(), type='PSOCK')

detect system and then register the cores appropriately

system <- Sys.info()['sysname']

# if user is on Windows, cores / threads for certain functions have
# to be registered differently, i.e., as a cluster object that implements
# SNOW functionality.
# If on Mac / Linux, they are registered via 'mc.cores' and as a number of
# cores to registerDoParallel()
cl <- NULL
if (system == 'Windows') {
    cl <- makeCluster(getOption('cl.cores', cores))
    registerDoParallel(cl)
    registerDoSEQ()
    on.exit(stopCluster(cl))
} else {
    options('mc.cores' = cores)
    registerDoParallel(cores)
}

load testing airway data

require(airway)
data("airway")

-------------------------------------------

1, regression analysis (any type of lm, glm, clogit, et cetera [here, glm])

I originally wrote this for work in which I had to run a genomewide conditional logistic regression model with family ID (FID) as the matching stratum and testing each SNP, genomewide, independently, and adjusting for age + gender + PC1 + PC2.

I broke the SNPs into blocks of 5000, split across multiple CPU cores and chromosomes, and analysed them that way over a series of days using >30 CPU cores.

This code will most likely have to be adapted if you wish to use it for your own applications.

For this, we will reduce the number of chosen cores and blocksize:

cpucores <- 4
options("mc.cores"=cpucores)
registerDoParallel(cpucores)

blocksize <- 150

NB - use a small blocksize if RAM is low. Most current desktops should support 100 - 500

All variants/data-points are stored in the data-frame modelling in this code. First, normalise the airway expression data using DESeq2 and transform to rlog counts:

library(airway)
data("airway")
library("DESeq2")
dds <- DESeqDataSet(airway, design = ~ cell + dex)
dds <- DESeq(dds, betaPrior = FALSE)
rlogcounts <- rlog(dds, blind=TRUE) # or vstcounts <- vst(dds, blind=TRUE)

For this example, we'll greatly reduce the number of datapoints to 500:

rlogcounts <- assay(rlogcounts)[1:500,]

Merge the metadata (colData) and logged data together:

modelling <- data.frame(colData(airway), t(rlogcounts))

Now, we can start to specify the models that will be tested:

# create en empty list to hold the formulae
formula.list <- list()

# store each possible formula in the list
startcolIndex <- ncol(colData(airway)) + 1
for (j in startcolIndex:ncol(modelling))
{
            formula.list[[j - (startcolIndex - 1)]] <- as.formula(paste("dex ~ cell + ", colnames(modelling)[j], sep=""))
}

head(formula.list)
[[1]]
dex ~ cell + ENSG00000000003

[[2]]
dex ~ cell + ENSG00000000005

[[3]]
dex ~ cell + ENSG00000000419

[[4]]
dex ~ cell + ENSG00000000457

[[5]]
dex ~ cell + ENSG00000000460

[[6]]
dex ~ cell + ENSG00000000938

Run a base model without the gene - this will replace later models that fail:

formula <- as.formula(paste("dex ~ cell", sep=""))
basemodel <- glm(formula, modelling, family=binomial(link='logit'))

Obtain a vector of all gene names and determine number of blocks:

varnames <- colnames(modelling)[startcolIndex:ncol(modelling)]
blocks <- floor((length(varnames)) / blocksize)+1

Create output file and write header:

write.table(c("Gene\tBetaEstimate\tStdError\tzvalue\tpvalue"), "out.tsv", sep="\t", quote=FALSE, col.names=FALSE, row.names=FALSE, append=FALSE)

Now loop over the number of blocks and process the formulae in each in a parallel fashion:

for (l in 1:blocks)
{
    # run the logistic regression using mclappy, i.e., multiple cores
    # first block - will be executed just once
    if (l==1)
    {
        print(paste("Processing ", blocksize, " formulae, batch ", l, " of ", blocks, sep=""))
        print(paste("Index1: 1; ", "Index2: ", (blocksize*l), sep=""))
        models <- mclapply(formula.list[1:(blocksize*l)], function(x) glm(x, modelling, family=binomial(link='logit')))
        names(models) <- varnames[1:(blocksize*l)]
    }

    # final block - will be executed just once
    if (l==blocks)
    {
        print(paste("Processing final batch ", l, " of ", blocks, sep=""))
        print(paste("Index1: ", (1+(blocksize*(l-1))), "; ", "Index2: ", length(formula.list), sep=""))
        models <- mclapply(formula.list[(1+(blocksize*(l-1))):length(formula.list)], function(x) glm(x, modelling, family=binomial(link='logit')))
        names(models) <- varnames[(1+(blocksize*(l-1))):length(formula.list)]
    }

    # any other blocks - executed any number of times between first and final block
    if (l>1 && l<blocks)
    {
        print(paste("Processing ", blocksize, " formulae, batch ", l, " of ", blocks, sep=""))
        print(paste("Index1: ", (1+(blocksize*(l-1))), "; ", "Index2: ", (blocksize*l), sep=""))
        models <- mclapply(formula.list[(1+(blocksize*(l-1))):(blocksize*l)], function(x) glm(x, modelling, family=binomial(link='logit')))
        names(models) <- varnames[(1+(blocksize*(l-1))):(blocksize*l)]
    }

    #If any models failed, detect them and replace with the base model
    wObjects <- mclapply(models, function(x) if (is.null(names(x))) { x <- basemodel } else {x} )

    #Extract information from the model and store in a new list
    wObjects <- mclapply(wObjects, function(x) data.frame(rownames(summary(x)$coefficients), summary(x)$coefficients))

    # remove intercept and covariates from final output (OPTIONAL)
    wObjects <- mclapply(wObjects, function(x) x[grep("Intercept|^cell", rownames(x), invert=TRUE),])

    #Convert the list into a data frame for writing
    wObject <- do.call(rbind, lapply(wObjects, data.frame, stringsAsFactors=FALSE))
    wObject <- data.frame(gsub("\\.[a-zA-Z0-9_():]*", "", rownames(wObject)), wObject)
    wObject[,2] <- gsub("factor", "", wObject[,2])

    #Output the results to disk
    write.table(wObject[,-1], "out.tsv", sep="\t", quote=FALSE, col.names=FALSE, row.names=FALSE, append=TRUE)
}

top

[1] "Processing 150 formulae, batch 1 of 4"
[1] "Index1: 1; Index2: 150"
[1] "Processing 150 formulae, batch 2 of 4"
[1] "Index1: 151; Index2: 300"
[1] "Processing 150 formulae, batch 3 of 4"
[1] "Index1: 301; Index2: 450"
[1] "Processing final batch 4 of 4"
[1] "Index1: 451; Index2: 500"

In BASH:

head out.tsv
Gene            BetaEstimate    StdError    zvalue    pvalue
ENSG00000000003 250.73          398128.52   0.00062   0.99
ENSG00000000419 -1460.52        2565102.39  -0.00056  0.99
ENSG00000000457 -14.58          18.77       -0.77     0.43
ENSG00000000460 3.29            6.51        0.5       0.6131
ENSG00000000938 207.69          223.39      0.92      0.35
ENSG00000000971 -254.13         569394.2    -0.00044  0.99
ENSG00000001036 442.00          1024534.80  0.00043   0.99
ENSG00000001084 1.92            5.92        0.32      0.74
ENSG00000001167 169.00          299365.37   0.00056   0.99

NB - if on Windows, use parLapply(cl, <code>) in place of mclapply(<code>) (unless using Microsoft R).

2, clusGapKB (linux / Mac)

This is an edit of the oiginal clusGap, related to the Gap Statistic by Rob Tibshirani (https://statweb.stanford.edu/~gwalther/gap). I edited it at 3 points (indicated by #KB tag) to enable parallel processing. For computing the gap statistic for large matrices, this is crucial.

clusGapKB <- function (x, FUNcluster, K.max, B=100, d.power=1, verbose=interactive(), ...)
{
    stopifnot(is.function(FUNcluster), length(dim(x))==2, K.max>=2, (n<-nrow(x))>=1, ncol(x)>=1)

    if (B != (B. <- as.integer(B)) || (B <- B.) <= 0)
    {
        stop("'B' has to be a positive integer")
    }

    if (is.data.frame(x))
    {
        x <- as.matrix(x)
    }

    ii <- seq_len(n)

    #KB
    ############
    #Create a function that will accept a parallel job using mclapply
    funParallel <- parallel::mclapply
    ############

    W.k <- function(X, kk)
    {
        clus <- if (kk > 1)
        {
            FUNcluster(X, kk, ...)$cluster
        }
        else
        {
            rep.int(1L, nrow(X))
        }

        0.5 * sum(vapply(split(ii, clus), function(I)
        {
            xs <- X[I, , drop = FALSE]
            sum(dist(xs)^d.power/nrow(xs))
        }, 0))
    }

    logW <- E.logW <- SE.sim <- numeric(K.max)

    if (verbose)
    {
        cat("Clustering k = 1,2,..., K.max (= ", K.max, "): .. ", sep = "")
    }

    #KB
    ############
    #Original code
    #for (k in 1:K.max) logW[k] <- log(W.k(x, k))

    #New code optimised for parallelistion
    logW <- unlist(funParallel(1:K.max, function(k) log(W.k(x, k))))
    ############

    if (verbose)
    {
        cat("done\n")
    }

    xs <- scale(x, center = TRUE, scale = FALSE)

    m.x <- rep(attr(xs, "scaled:center"), each = n)

    V.sx <- svd(xs, nu = 0)$v

    rng.x1 <- apply(xs %*% V.sx, 2, range)

    logWks <- matrix(0, B, K.max)

    if (verbose)
    {
        cat("Bootstrapping, b = 1,2,..., B (= ", B, ")  [one \".\" per sample]:\n", sep = "")
    }

    for (b in 1:B)
    {
        z1 <- apply(rng.x1, 2, function(M, nn) runif(nn, min=M[1], max=M[2]), nn=n)

        z <- tcrossprod(z1, V.sx) + m.x

        #KB
        ############
        #Original code
        #for (k in 1:K.max)
        #{
        #   logWks[b, k] <- log(W.k(z, k))
        #}

        #Modified code for parallelisation
        tmplogWks <- unlist(funParallel(1:K.max, function(k) log(W.k(z, k))))
        logWks[b,1:K.max] <- tmplogWks
        ############

        if (verbose)
        {
            cat(".", if (b%%50 == 0) paste(b, "\n"))
        }
    }

    if (verbose && (B%%50 != 0))
    {
        cat("", B, "\n")
    }

    E.logW <- colMeans(logWks)

    SE.sim <- sqrt((1 + 1/B) * apply(logWks, 2, var))

    structure(class="clusGap", list(Tab=cbind(logW, E.logW, gap=E.logW-logW, SE.sim), n=n, B=B, FUNcluster=FUNcluster))
}

For PAM, create custom function that just performs the clustering and ONLY retains the medoids for each k:

CustomPAM <- function(x,k) list(cluster=pam(x, k, diss=FALSE, metric="euclidean", medoids=NULL, stand=FALSE, cluster.only=TRUE, do.swap=TRUE, keep.diss=FALSE, keep.data=FALSE, pamonce=TRUE, trace.lev=0))

Execute function with:

require(cluster)
gap <- clusGapKB(assay(airway), FUNcluster=CustomPAM, K.max=20, B=250)

Can also just do kmeans clustering:

gap <- clusGapKB(assay(airway), FUNcluster=kmeans, K.max=20, B=250)

top

For further ideas, see Step 6a from cytofNet.

ADD COMMENTlink modified 6 weeks ago • written 14 months ago by Kevin Blighe32k
2

I learnt the parallel + mclapply trick a few months ago, really useful in a few niche cases IMO.

ADD REPLYlink written 10 weeks ago by RamRS19k

@Kevin Im using your enhanced function works pretty cool..

ADD REPLYlink written 8 weeks ago by krushnach80420

Good work, Krushnach! Which function in particular?

ADD REPLYlink written 8 weeks ago by Kevin Blighe32k

both clusgap and the correlation matrix calculation

ADD REPLYlink written 7 weeks ago by krushnach80420

Great. I have removed the correlation matrix function, though. It is 'on the shelf' for improvement.

ADD REPLYlink written 7 weeks ago by Kevin Blighe32k
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 2.3.0
Traffic: 2039 users visited in the last hour