Search code examples
nextflow

Workflow stops suddenly when creating channel


This is my code:

process descargaSRA {

publishDir("sra_files", mode: 'copy')

input:
    path sra_file

output:
    file "SRR*"

script:
    """
    ids_sra=\$(cat ${sra_file})
    for id in \${ids_sra}
    do
        prefetch \${id} -O .
        mv "\$id/\$id.sra" .
        rm -r "\$id"
    done
    """

}

process convert_to_fastq {

input:
    file sra_content 

output:
    path "fastq_files"

script:
    """
    fastq-dump --split-files ${sra_content} -O fastq_files
    """

}

process filter_reads {

publishDir("fastp_files", mode: 'copy')

input:
tuple val(id), val(fastq_files)
output:
tuple val(id), path("clean_*")

script:
"""
echo "Processing ${id} reads"

fastp -i ${fastq_files[0]} -I ${fastq_files[1]} -o clean_${id}_1.fastq -O clean_${id}_2.fastq --cut_tail --cut_window_size 5 --cut_mean_quality 30
"""

}

process indexing_reference {

publishDir("index_reference", mode: "copy")
input:
    path reference
    path fastp

output:
    path "genome*"

script:
    """
    bwa index ${reference} -p genome
    """

}

process mapping_to_reference{ publishDir("bam_files",mode:"copy")

input:
    path genome
    tuple val(id), val(clean_fastq)
output:
    path "*.sorted.bam"
script:
    """
    bwa mem genome ${clean_fastq[0]} ${clean_fastq[1]} > ${id}.sam
    samtools view ${id}.sam -o ${id}.bam
    samtools sort ${id}.bam -o ${id}_sorted.bam
    samtools index ${id}_sorted.bam
    """

}

workflow {

input_ch = Channel.fromPath(params.sra_file)
reference = Channel.fromPath(params.reference)


sra_ch = descargaSRA(input_ch)

fastq_ch = convert_to_fastq(sra_ch)

"This channel will group the fastq files by ID"
fastq_paths = Channel.fromFilePairs("${fastq_ch}/SRR*_{1,2}.fastq")

fastp_ch = filter_reads(fastq_paths)
genome = indexing_reference(reference,fastp_ch)
map_ch = mapping_to_reference(genome,fastp_ch)

}

I will give some context. The process descargaSRA gets a file with accession names and downloads the .sra files, while the process convert_to_fastq turns those .sra into .fastq files. However, for each .sra there are two fastq (SRR*_1.fastq and SRR*_2.fastq) and this is creating a problem because when correcting and mapping I need those two files together.

For this reason, I thought of using a new channel with the method .fromFilePairs so as to link the SRR_ID with both fastq files. Despite this, when running the pipeline the process stops as soon as convert_to_fastq ends, so I need to run the pipeline again to get the final results. I guess that fastq_paths is not directly connected to fastq_ch given that I'm only using the value and not the channel itself, but I couldn't come up with any other solution.

Thanks for your help, I am trying to learn this new tool.


Solution

  • Manipulate your convert_to_fastq output channel in such a way this it has a [sampleID, file.fastq] format. Then call groupTuple() on that channel to pair your fastq's per sample.