Search code examples
groovynextflow

Nextflow: transforming input of fromFilePairs into a tuple of (map, list_pair_1, list_pair_2)


In my Nextflow workflow, I need to process files similar to the below example.

a.vcf.gz
a.vcf.gz.tbi
b.vcf.gz
b.vcf.gz.tbi
c.vcf.gz
c.vcf.gz.tbi

In particular, I need to create a channel which will output them with this structure:

[
    ["id": "test"], 
    ["a.vcf.gz", "b.vcf.gz", "c.vcf.gz"], 
    ["a.vcf.gz.tbi", "b.vcf.gz.tbi", "c.vcf.gz.tbi"]
]

This means a tuple of a single map, one tuple of *.vcf.gz files and one tuple of *.vcf.gz.tbi files.

My problem is that, from my reading of the documentation, it's not evident how to create it from a channel that emits items sequentially in groups of three.

For simplicity, I collect the files from pairs using Channel.fromFilePairs:

ch_input = Channel
    .fromFilePairs("*{.vcf.gz,.vcf.gz.tbi}")

This is where I got stuck. The closest I've got was by scrapping fromFilePairs and using groupTuple:

 ch_input = Channel
    .fromPath("*.vcf.gz*")
    .map {
       file ->
       def fmeta = ["id": "test"]
       value = file.extension == "gz" ? "vcf": "tbi"
       [value, file]
     }.groupTuple()
    
    println ch_input.view()   

Which gives:

[tbi, [/Users/einar/Coding/a.vcf.gz.tbi, /Users/einar/Coding/c.vcf.gz.tbi, /Users/einar/Coding/einar/b.vcf.gz.tbi]]
[vcf, [/Users/einar/Coding/b.vcf.gz, /Users/einar/Coding/a.vcf.gz, /Users/einar/Coding/c.vcf.gz]]

Which still is far away from what I'd like and more fragile because it relies on file extensions.

Channel.multiMap is close to what I want, however it generates multiple channels, while instead I need a single channel.

How can this be done properly?

EDIT:

This is another attempt, which gets what I want, however it looks kind of hacky and fragile to me:

ch_input = Channel
        .fromPath("*.vcf*")
        .map{
            file -> 
            [file.extension, file]
        }.groupTuple()
        .map {
         it ->
          def fmeta = ["id": "test"]
          [fmeta, it[1].flatten()]
         }.groupTuple()
         .map{
           it -> 
           [it[0], it[1][0], it[1][1]]
         }

    
    println ch_input.view()

Solution

  • To get what you want, you'd need the collect operator which gives you a value channel:

    Channel
        .fromFilePairs( '/path/to/files/*.vcf.gz{,.tbi}' )
        .collect { sample, indexed_vcf -> [ indexed_vcf ] }
        .map { 
            def fmeta = [ "id": "test" ]
    
            [ fmeta, it*.first(), it*.last() ] 
        } 
        .view()
    

    It's difficult to say without the details, but usually you don't need to separate out the index files from the actual VCF files. If this channel is to be used directly as process input, my preference would be to alter the input declaration so that I could use something like this instead:

    Channel
        .fromPath( '/path/to/files/*.vcf.gz{,.tbi}' )
        .collect()
        .map { 
            def fmeta = ["id": "test"]
    
            [ fmeta, it ]
        } 
        .view()