Search code examples
nextflow

Nextflow Process `MULTIMETRICS` declares 3 input channels but 5 were specified


I am creating a nextflow pipeline for analysing genomics data. The pipeline can carry out all steps up to creating BAM files and marking duplicates. However I am unable to carry the created BAM files over from my MARK_DUPLICATES step into the GATK MultiMetrics tool as the MULTMETRICS process declares 3 input channels but 5 were specified.

Process `MULTIMETRICS` declares 3 input channels but 5 were specified

The MARKDUPLICATES process

process MARK_DUPLICATES {

    cpus 10

    publishDir params.outdir, mode:'move'



    container 'broadinstitute/gatk:latest' 

    input:

    tuple val(sample_id), path(reads)

    output:

    path "${sample_id}_MarkedDup.bam"

    path "${sample_id}_MarkedDuplicates.txt"

    path "${sample_id}_MarkedDup.bai"

    script:

    """

    gatk MarkDuplicates I=${reads[0]} O=${sample_id}_MarkedDup.bam M=${sample_id}_MarkedDuplicates.txt CREATE_INDEX=true 



    """   


}

The MULTIMETRICS process:

process MULTIMETRICS {


    container 'broadinstitute/gatk:latest'

    input:
    path "${sample_id}_MarkedDup.bam"
        path(genome)
    val genomeid


    output:

    tuple val(sample_id), path("${sample_id}_multimetrics")

    script:
    """

    gatk CollectMultipleMetrics I=${reads} O=${sample_id}_multimetrics R=${genome}/$genomeid

    """
}

The workflow:

    picard_ch=MARK_DUPLICATES(addreadgroups_ch)

//    picard_ch.view()

    multimetrics_ch=MULTIMETRICS(picard_ch, params.genomefile, params.genomeid)

Solution

  • I see what's happening.

    MARK_DUPLICATES declares 3 separate files in the output declaration. Since you've not specified which you want to send to MULTIMETRICS the picard_ch channel will have all 3 files. Though I do find it a little strange it's not trying to process each individually.

    This can be solved with more explicit output declarations. Here is one solution channeling all MARK_DUPLICATES outputs into a single channel:

    process MARK_DUPLICATES {
        cpus 10
        publishDir params.outdir, mode:'copy' // The docs suggest unless this is the final process, you shouldn't you use 'move'
        container 'broadinstitute/gatk:latest' 
    
        input:
        tuple val(sample_id), path(reads)
    
        output:
        tuple val(sample_id), path("${sample_id}_MarkedDup.bam"), path("${sample_id}_MarkedDup.bai"), path("${sample_id}_MarkedDuplicates.txt"), emit: picard_ch
    
        script:
        """
        gatk MarkDuplicates I=${reads[0]} O=${sample_id}_MarkedDup.bam M=${sample_id}_MarkedDuplicates.txt CREATE_INDEX=true 
        """
    }
    process MULTIMETRICS {
        container 'broadinstitute/gatk:latest'
    
        input:
        tuple val(sample_id), path(bam), path(bai), path(txt)
        path(genome)
        val genomeid
    
    
        output:
        tuple val(sample_id), path("${sample_id}_multimetrics"), emit: mm_ch
    
        script:
        """
        gatk CollectMultipleMetrics I=${reads} O=${sample_id}_multimetrics R=${genome}/$genomeid
        """
    }
    
    workflow {
        ...
        MARK_DUPLICATES(addreadgroups_ch)
        MULTIMETRICS(MARK_DUPLICATES.out.picard_ch, params.genomefile, params.genomeid)
    }
    
    

    Alternatively, you could just emit the bam from MARK_DUPLICATES like this:

    process MARK_DUPLICATES {
        ...
    
        output:
        tuple val(sample_id), path("${sample_id}_MarkedDup.bam"), emit: picard_bam
        path("${sample_id}_MarkedDup.bai"), emit: picard_bai
        path("${sample_id}_MarkedDuplicates.txt"), emit: picard_txt
        ...
    }
    

    The MULTIMETRICS input declaration would need to change to:

    process MULTIMETRICS {
        ...
        input:
        tuple val(sample_id), path(bam)
        ...
    }
    

    And the workflow:

    workflow {
        ...
        MARK_DUPLICATES(addreadgroups_ch)
        MULTIMETRICS(MARK_DUPLICATES.out.picard_bam, params.genomefile, params.genomeid)
    }