Search code examples
bashgroovynextflownextflow-tower

Nextflow Pipeline: Efficiently process genomic regions stored in multiple files and merge the results afterward while keeping track of files


Currently, I have 22 files named chr1.txt through chr22.txt, each containing genomic regions. I need to run a specific process on these regions and then merge the processed data into merged_chr1.txt, merged_chr2.txt, and so on for each chromosome. However, I'm facing difficulties implementing this in Nextflow.

E.g. for chr10.txt

0       chr10   chr10:1-16637292
1       chr10   chr10:15435012-33506537
2       chr10   chr10:30788681-57663231
3       chr10   chr10:54643594-75924707
4       chr10   chr10:71749908-93488832
5       chr10   chr10:91114596-112850006

I was able to successfully read the files using fromPath, and modify the channel using map to create an appropriate input channel for the next step. However, when I execute the process, it only shows one output despite running processes for multiple regions across different files. How can I modify my script to print the process for each region separately and track progress by the file (e.g., chr1.txt) to know when all regions are completed and ready for merging?

Here's my current Nextflow script for reference:

params.chunks_dir = "/workspace/gitpod/testing/chr*.txt"
params.results    = "/workspace/gitpod/testing/results"

workflow {
    ch_chunk_files = Channel.fromPath(params.chunks_dir)
                    .splitCsv(header: false, sep: "\t")
                    .map{ chunk_no, chr, regions -> tuple(chr, [chunk_no, regions])}
                    .groupTuple()
                    .set{ regions }
    regions.view()
    // return

    run_process1(regions)

}

process run_process1 {
    // tag "${buff_regions}"

    // publishDir "${params.results}"

    input:
    each some_region

    output:
    path "UKB_${chr}.chunk_${coords}.shapeit5_common.txt"

    script:
    """
    chr=${some_region[0]}
    idx=${some_region[1]}
    buff_regions=${some_region[2]}

    if [ ! -f "ABCD_${chr}.chunk_${idx}.txt" ]; then
            touch "ABCD_${chr}.chunk_${idx}.test.txt"
    fi
        
        echo ${buff_regions} > ABCD_${chr}.chunk_${idx}.test.txt
    """
}

Here's the output I get for regions.view():

[chr10, [[0, chr10:1-16637292], [1, chr10:15435012-33506537], [2, chr10:30788681-57663231], [3, chr10:54643594-75924707], [4, chr10:71749908-93488832], [5, chr10:91114596-112850006]]
[chr22, [[0, chr22:1234567-33613738], [1, chr22:32523221-67891234]]]

But overall it's an error. What I would really like to do is, read each of these files (chr1.txt, ..., chr22.txt), all of which have genetic regions, and run process1 independently for each of these regions. Once that's completed, I want to track its completion for all regions of the file (probably using collect(), and then I would like to run process2 to merge these files.

I have been trying to do what they did at Run nextflow pipeline multiple time depending on length of a list, but unsuccessfully.


Solution

  • I'm surprised you are not getting an input cardinality error from your input declaration of run_process1.

    Overall, I think the easiest solution would be to remove your groupTuple from the ch_chunk_files channel. Then you have object with the following structure for each process [chr, [idx, coords]] rather than dealing with tuples with flexible lengths. This also means processing times will be more uniform, rather than everything waiting for chrx which could have 20 more regions than the others, for example.

    workflow {
        ch_chunk_files = Channel.fromPath(params.chunks_dir)
                        .splitCsv(header: false, sep: "\t")
                        .map{ chunk_no, chr, regions -> tuple(chr, [chunk_no, regions])}
                        .set{ regions }
    
        run_process1(regions)
    }
    process run_process1 {
        tag { chr }
    
        publishDir "${params.results}"
    
        input:
        tuple chr, regions
    
        output:
        path "UKB_${chr}.chunk_${coords}.shapeit5_common.txt"
    
        script:
        def (idx, coords) =  regions
        """
        if [ ! -f "ABCD_${chr}.chunk_${idx}.txt" ]; then
                touch "ABCD_${chr}.chunk_${idx}.test.txt"
        fi
            
        echo ${coords} > ABCD_${chr}.chunk_${idx}.test.txt
        """
    }