Search code examples
dslnextflow

Combine outputs of mutually exclusive processes in a Nextflow (DSL2) pipeline


I have a DSL2 workflow in Nextflow set up like this:


nextflow.enable.dsl=2

// process 1, mutually exclusive with process 2 below
process bcl {

    tag "bcl2fastq"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Stats/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl2fastq/bin:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Stats/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/*', emit: interop_ch
        path 'Run*.xml'
    script: 
        // processing omitted
    }

// Process 2, note the slightly different outputs
process bcl_convert {
tag "bcl-convert"
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/**fastq.gz'
    publishDir params.outdir, mode: 'copy', pattern: 'fastq/Reports/*'
    publishDir params.outdir, mode: 'copy', pattern: 'InterOp/*'
    publishDir params.outdir, mode: 'copy', pattern: 'Run*.xml'
    beforeScript 'export PATH=/opt/tools/bcl-convert/:$PATH'

    input:
        path runfolder
        path samplesheet

    output:
        path 'fastq/Reports/', emit: bcl_ch
        path 'fastq/**fastq.gz', emit: fastqc_ch
        path 'InterOp/', emit: interop_ch
        path 'Run*.xml'

    script:
        // processing omitted
}

// downstream process that needs either the first or the second to work, agnostic
process fastqc {
    cpus 12

    publishDir "${params.outdir}/", mode: "copy"

    module 'conda//anaconda3'
    conda '/opt/anaconda3/envs/tools/'

    input:
        path fastq_input
    output:
        path "fastqc", emit: fastqc_output

    script:
    """
    mkdir -p fastqc
    fastqc -t ${task.cpus} $fastq_input -o fastqc
    """

}

Now I have a variable, params.bcl_convert which can be used to switch from one process to the other, and I set up the workflow like this:

workflow {
    runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")

    runfolder = Channel.fromPath(runfolder_repaired, type: 'dir')
    sample_data = Channel.fromPath(params.samplesheet, type: 'file')

    if (!params.bcl_convert) {
       bcl(runfolder, sample_data)
    } else {
        bcl_convert(runfolder, sample_data)
    }

    fastqc(bcl.out.mix(bcl_convert.out)) // Problematic line
}

The problem lies in the problematic line: I'm not sure how (and if it is possible) to have fastqc get the input of bcl2fastq or bcl_convert (but only fastq_ch, not the rest) regardless of the process that generated it.

Some of the things I've tried include (inspired by https://github.com/nextflow-io/nextflow/issues/1646, but that one uses a the output of a process):

    if (!params.bcl_convert) {
       def bcl_out = bcl(runfolder, sample_data).out
    } else {
        def bcl_out = bcl_convert(runfolder, sample_data).out
    }

    fastqc(bcl_out.fastq_ch)

But this then compilation fails with Variable "runfolder" already defined in the process scope, even using the approach in a similar way as the post:

def result_bcl2fastq = !params.bclconvert ? bcl(runfolder, sample_data): Channel.empty()
def result_bclconvert = params.bclconvert ? bcl_convert(runfolder, sample_data): Channel.empty()

I thought about using conditionals in a single script, however the outputs from the two processes differ, so it's not really possible. The only way I got it to work is by duplicating all outputs, like:

if (!params.bcl_convert) {
   bcl(runfolder, sample_data)
   fastqc(bcl.out.fastqc_ch)
} else {
   bcl_convert(runfolder, sample_data)
   fastqc(bcl_convert.out.fastqc_ch
}

However this looks to me like unnecessary complication. Is what I want to do actually possible?


Solution

  • I was able to figure this out, with a lot of trial and error.

    Assigning a variable to a process output acts like the .out property of said process. So I set the same variable for the two exclusive processes, set the same outputs (as seen in the question) and then accessed them directly without using .out:

    workflow {
    
        runfolder_repaired = "${params.runfolder}".replaceFirst(/$/, "/")
    
        runfolder = Channel.fromPath(
            runfolder_repaired, type: 'dir')
    
        sample_data = Channel.fromPath(
            params.samplesheet, type: 'file')
    
        if (!params.bcl_convert) {
           bcl_out = bcl2fastq(runfolder, sample_data)
        } else {
           bcl_out = bcl_convert(runfolder, sample_data)
        }
        fastqc(bcl_out.fastqc_ch)
    }