Search code examples
rfor-loopmpiseurat

Why is my for loop to assign a task to a comm.rank in pbdMPI failing?


I am running a R script parallelized with pbdMPI in which 10 comm.ranks load 1 file each, then comm.rank 0 gathers all the files and is supposed to merge them. I wrote a for loop to specify that only comm.rank 0 should perform merging, however the loop is failing and all ranks attempt the merge causing the script to stall when items to merge run out.

The script starts by loading 10 gene matrices (1 per rank) and normalizes them. Then rank 0 gathers each Seurat object into a list called x. I assigned the first item in the list 'x' as Seuratdata, then ask rank 0 to merge the remaining objects in x.

x<-gather(Obj, rank.dest=0)
rm(Obj)
comm.print(x, all.rank=TRUE)
invisible(gc())
barrier()

comm.print("starting merging")
#comm.print(x, all.rank=TRUE)

comm.print(paste("length of x is", length(x)), all.rank=TRUE)
if(comm.rank()==0){
        x<-x
    Seuratdata<-x[[1]]

} else {
x<-NULL
}

if(comm.rank()==0){
        for(i in 2:length(x)){
        comm.print(paste("merging object", i),all.rank=TRUE)
        comm.print(i)
        comm.print(x[[i]])
    Seuratdata<-merge(Seuratdata, x[[i]], add.cell.ids=NULL, merge.data=TRUE)
    x[[i]]<-NA
        comm.print(paste("done merging", i))
}
}

The script runs until partially then stops. How can I make sure that only rank 0 is merging, and the other ranks remain uninvolved? This is the output for comm.print(x, all.rank=TRUE)

COMM.RANK = 0
[[1]]
An object of class Seurat 
114 features across 4034 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)

[[2]]
An object of class Seurat 
114 features across 1690 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
..........

[[9]]
An object of class Seurat 
114 features across 3137 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)

[[10]]
An object of class Seurat 
114 features across 3601 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)

COMM.RANK = 1
NULL
COMM.RANK = 2
NULL
COMM.RANK = 3
NULL
COMM.RANK = 4
NULL
COMM.RANK = 5
NULL
COMM.RANK = 6
NULL
COMM.RANK = 7
NULL
COMM.RANK = 8
NULL
COMM.RANK = 9
NULL

I tried comm.print on comm.print(i, all.rank=TRUE) and I can see that all ranks are trying to merge at the same time despite specifying if(comm.rank()==0){...}

COMM.RANK = 0
[1] 2
COMM.RANK = 1
[1] 2
COMM.RANK = 2
[1] 2
COMM.RANK = 3
[1] 2
COMM.RANK = 4
[1] 2
COMM.RANK = 5
[1] 2
COMM.RANK = 6
[1] 2
COMM.RANK = 7
[1] 2
COMM.RANK = 8
[1] 2
COMM.RANK = 9
[1] 2
An object of class Seurat 
114 features across 1690 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
COMM.RANK = 0
[1] 3
COMM.RANK = 1
[1] 1
COMM.RANK = 2
[1] 1
COMM.RANK = 3
[1] 1
COMM.RANK = 4
[1] 1
COMM.RANK = 5
[1] 1
COMM.RANK = 6
[1] 1
COMM.RANK = 7
[1] 1
COMM.RANK = 8
[1] 1
COMM.RANK = 9
[1] 1
An object of class Seurat 
114 features across 3989 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
COMM.RANK = 0
[1] 4
COMM.RANK = 1
[1] 0
COMM.RANK = 2
[1] 0
COMM.RANK = 3
[1] 0
COMM.RANK = 4
[1] 0
COMM.RANK = 5
[1] 0
COMM.RANK = 6
[1] 0
COMM.RANK = 7
[1] 0
COMM.RANK = 8
[1] 0
COMM.RANK = 9
[1] 0
An object of class Seurat 
114 features across 2421 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
COMM.RANK = 0
[1] 5
COMM.RANK = 1
[1] 2
COMM.RANK = 2
[1] 2
COMM.RANK = 3
[1] 2
COMM.RANK = 4
[1] 2
COMM.RANK = 5
[1] 2
COMM.RANK = 6
[1] 2
COMM.RANK = 7
[1] 2
COMM.RANK = 8
[1] 2
COMM.RANK = 9
[1] 2
An object of class Seurat 
114 features across 4402 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
COMM.RANK = 0
[1] 6
COMM.RANK = 1
[1] 1
COMM.RANK = 2
[1] 1
COMM.RANK = 3
[1] 1
COMM.RANK = 4
[1] 1
COMM.RANK = 5
[1] 1
COMM.RANK = 6
[1] 1
COMM.RANK = 7
[1] 1
COMM.RANK = 8
[1] 1
COMM.RANK = 9
[1] 1
An object of class Seurat 
114 features across 3480 samples within 1 assay 
Active assay: RNA (114 features, 0 variable features)
COMM.RANK = 0
[1] 7
COMM.RANK = 1
[1] 0
COMM.RANK = 2
[1] 0
COMM.RANK = 3
[1] 0
COMM.RANK = 4
[1] 0
COMM.RANK = 5
[1] 0
COMM.RANK = 6
[1] 0
COMM.RANK = 7
[1] 0
COMM.RANK = 8
[1] 0
COMM.RANK = 9
[1] 0


Solution

  • Collective operations need all ranks to be involved. Inside a conditional section that only rank 0 executes, you should use plain print() rather than comm.print(), especially if you request all.rank = TRUE.

    What happens is that rank 0 waits for all other ranks to reach an implicit barrier that is in comm.print(..., all.rank = TRUE). The code of rank 0 simply hangs waiting for the other ranks to do what they never do.

    Below is a modified version of your code section.

    x <- gather(Obj, rank.dest = 0)
    rm(Obj)
    comm.print(x, all.rank = TRUE)
    invisible(gc())
    barrier()
    
    comm.print("starting merging")
    comm.print(paste("length of x is", length(x)), all.rank=TRUE)
    
    if(comm.rank()==0){
      x <- x
      Seuratdata<-x[[1]]
    } else {
      x <- NULL
    }
    
    if(comm.rank()==0){
        for(i in 2:length(x)){
        print(paste("merging object", i))
        print(i)
        print(x[[i]])
        Seuratdata <- merge(Seuratdata, x[[i]], add.cell.ids = NULL, merge.data = TRUE)
        x[[i]] <- NA
        print(paste("done merging", i))
      }
    }
    

    The main reason for comm.print() is to control the amount of output as well as garbled overprinting when many ranks print to the same place at the same time.

    Note that your reading may be parallel but the merging is effectively serial because only rank 0 is doing it.