nextflow - spltiCSV - each element - error : If you need to reuse the same component

For a nextflow pipeline I'd like to read in a CSV file with five columns:


I read in the file, create a linkedHashMap. For each element I'd like to run few processes. The processes has been working fine without CSV iteration, as they were provided by a channel of tumor files and a channel of normal files.

When I edit the code with a CSV, I get error as:

Process 'FASTP' has been already used -- If you need to reuse the same component, include it with a different name or include it in a different workflow context

Below is the code:

include { FASTP} from './'
include {bwa_index} from './'

include { align_bwa_mem} from './'
include { gatk_markduplicates} from './'
include {setupnmdtags} from './'
include { recalibrate_bam } from './'
include { applybqsr } from './'
include { mutect2 } from './'
include { lancet } from './'
include { manta } from './'
include { strelka } from './'
include { gatk_merge_vcfs } from  './'

workflow {
def csvFile = file("input_nextflow_files.csv")
def csvLines = csvFile.text.readLines()

def sampleMap = csvLines.collectEntries { line ->
    def lineCols = line.split(',')
    if (lineCols.size() >= 5) {
        def sampleName = lineCols[0]
        def normalR1 = file(lineCols[1])
        def normalR2 = file(lineCols[2])
        def tumorR1 = file(lineCols[3])
        def tumorR2 = file(lineCols[4])

        [(sampleName): [tuple(normalR1, normalR2), tuple(tumorR1, tumorR2)]]
    } else {
        return [:]

sampleMap.each { sampleName, pairList ->
    def normalPair = pairList[0]
    def tumorPair = pairList[1]
    align_bwa_mem(FASTP.out.reads_tumor,FASTP.out.reads_normal) //already_created index

I believe it is something related to the FASTP process below (input):

process FASTP {
    maxForks 3
    debug true

    path(reads_tumor)  //val outdir  //doesn't work with path (outdir) // we pass multiple reads - for tumor and normal
    path(reads_normal)  //val outdir  //doesn't work with path (outdir)
    val (sample_name)


    tuple val(sample_name), path("${sample_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
    path("${sample_id_tumor}.fastp.json"), emit: json_tumor
    path("${sample_id_tumor}.fastp.html"), emit: html_tumor
    tuple val(sample_id_normal), path("${sample_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
    path("${sample_id_normal}.fastp.json"), emit: json_normal
    path("${sample_id_normal}.fastp.html"), emit: html_normal
    def (r1_normal, r2_normal) = reads_normal
    def (r1_tumor, r2_tumor)=reads_tumor


ml fastp

    fastp  --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_normal}_trim_1.fq.gz" --out2 "${sample_id_normal}_trim_2.fq.gz" --json "${sample_id_normal}.fastp.json" --html "${sample_id_normal}.fastp.html" --thread 12 
    fastp  --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${sample_id_tumor}_trim_1.fq.gz" --out2 "${sample_id_tumor}_trim_2.fq.gz" --json "${sample_id_tumor}.fastp.json" --html "${sample_id_tumor}.fastp.html" --thread 12 

echo "Exiting fastp"

I do not know how to fix this error. I checked if multiple times I'm not including FASTP process it is fine. I remove include and FASTP calling process they didn't work. So I cannot figure out what's going on.


  • When you iterate through your sample map using each, you are effectively trying to re-use the FASTP and align_bwa_mem processes with each iteration. Nextflow just complains that if they (i.e. the processes) need to be re-used, they'll need to be included using a different name (i.e. using a module alias) or in a different workflow context (i.e. using a sub-workflow). A better way to achieve what you want is to use channels and the splitCSV operator, for example:

    params.samples_csv = 'input_nextflow_files.csv'
    include { FASTP } from './'
    workflow {
        def header = ['sampleName', 'normalR1', 'normalR2', 'tumorR1', 'tumorR2']
            .fromPath( params.samples_csv )
            .splitCsv( header: header )
            .multiMap { row ->
                def tumor_reads = tuple( file(row.tumorR1), file(row.tumorR2) )
                def normal_reads = tuple( file(row.normalR1), file(row.normalR2) )
                    tuple( row.sampleName, tumor_reads )
                    tuple( row.sampleName, normal_reads )
            .set { samples }
        FASTP( samples.tumor.mix( samples.normal ) )

    Or if you wanted more flexibility, another way would be to import FASTP using a module alias:

    params.samples_csv = 'input_nextflow_files.csv'
    include { FASTP as FASTP_TUMOR } from './'
    include { FASTP as FASTP_NORMAL } from './'
    workflow {
        FASTP_TUMOR( samples.tumor )
        FASTP_NORMAL( samples.normal )

    Contents of ./

    process FASTP {
        tag { sample_id }
        tuple val(sample_id), path(reads, stageAs: 'reads/*')
        tuple val(sample_id), path("${sample_id}_trim_{1,2}.fq.gz"), emit: reads
        path "${sample_id}.fastp.json", emit: json
        path "${sample_id}.fastp.html", emit: html
        def (r1, r2) = reads
        fastp \\
            --in1 "${r1}" \\
            --in2 "${r2}" \\
            -q 20 \\
            -u 20 \\
            -l 40 \\
            --detect_adapter_for_pe \\
            --out1 "${sample_id}_trim_1.fq.gz" \\
            --out2 "${sample_id}_trim_2.fq.gz" \\
            --json "${sample_id}.fastp.json" \\
            --html "${sample_id}.fastp.html" \\
            --thread {task.cpus}