Search code examples
inputmergesamtoolsnextflowbam

Multiple outputs to single list input - merging BAM files in Nextflow


I am attempting to merge x number of bam files produced via performing multiple alignments at once (on batches of y number of fastq files) into one single bam file in Nextflow.

So far I have the following when performing the alignment and sorting/indexing the resulting bam file:

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        publishDir "$params.bamDir"
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        val dirString from dirStr
        val runString from stringRun
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        val runString into stringRun1
        file("${batchFastq}.bam") into bamFiles
        val dirString into dirStrSam

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

Where ${batchFastq}.bam is a bam file containing a batch of y number of fastq files.

This pipeline completes just fine, however, when attempting to perform samtools merge on these bam files in another process (samToolsMerge), the process runs each time an alignment is run (in this case, 4), instead of once for all bam files collected:

//Run samtools merge
process samToolsMerge {
        echo true
        publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
        cache 'deep'
        errorStrategy 'retry'
        maxRetries 3
        maxForks 10
        memory { 14.GB * task.attempt }

        input:
        val runString from stringRun1
        file bamFile from bamFiles.collect()
        val dirString from dirStrSam

        output:
        file("**")

        script:
        """
        samtools merge ${runString}.bam ${bamFile} 
        """
}

With the output being:

executor >  lsf (9)
[49/182ec0] process > catFastqs (1)     [100%] 1 of 1 ✔
[-        ] process > nanoPlotSummary   -
[0e/609a7a] process > miniMap2Bam (1)   [100%] 4 of 4 ✔
[42/72469d] process > samToolsMerge (2) [100%] 4 of 4 ✔




Completed at: 04-Mar-2021 14:54:21
Duration    : 5m 41s
CPU hours   : 0.2
Succeeded   : 9

How can I take just the resulting bam files from miniMap2Bam and run them through samToolsMerge a single time, instead of the process running multiple times?

Thanks in advance!

EDIT: Thanks to Pallie in the comments below, the issue was feeding the runString and dirString values from a prior process into miniMap2Bam and then samToolsMerge, causing the process to repeat itself each time a value was passed on.

The solution was as simple as removing the vals from miniMap2Bam (as follows):

//Run minimap2 on concatenated fastqs
process miniMap2Bam {
        errorStrategy 'retry'
        cache 'deep'
        maxRetries 3
        maxForks 10
        memory { 16.GB * task.attempt }

        input:
        each file(batchFastq) from fastqBatch.flatMap()

        output:
        file("${batchFastq}.bam") into bamFiles

        script:
        """
        minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
        samtools index ${batchFastq}.bam
        """
}

Solution

  • The simplest fix would probably to stop passing the static dirstring and runstring around via channels:

    // Instead of a hardcoded path use a parameter you passed via CLI like you did with bamDir
    dirString = file("/path/to/fastqs/")
    runString = file("/path/to/fastqs/").getParent()
    fastqBatch = Channel.from("/path/to/fastqs/")
    
    //Run minimap2 on concatenated fastqs
    process miniMap2Bam {
            publishDir "$params.bamDir"
            errorStrategy 'retry'
            cache 'deep'
            maxRetries 3
            maxForks 10
            memory { 16.GB * task.attempt }
    
            input:
            each file(batchFastq) from fastqBatch.flatMap()
    
            output:
            file("${batchFastq}.bam") into bamFiles
    
            script:
            """
            minimap2 --secondary=no --MD -2 -t 10 -a $params.genome ${batchFastq} | samtools sort -o ${batchFastq}.bam
            samtools index ${batchFastq}.bam
            """
    }
    
    //Run samtools merge
    process samToolsMerge {
            echo true
            publishDir "$dirString/aligned_minimap/", mode: 'copy', overwrite: 'false'
            cache 'deep'
            errorStrategy 'retry'
            maxRetries 3
            maxForks 10
            memory { 14.GB * task.attempt }
    
            input:
            file bamFile from bamFiles.collect()
    
            output:
            file("**")
    
            script:
            """
            samtools merge ${runString}.bam ${bamFile} 
            """