Search code examples
nextflow

Collecting multiple files as an output of a nextflow process and making a new queue


I have a nextflow process that looks like this

process TileImages {

    input:
        val run_id
        val group_id
        val sample_id
        path image
        path masks

    output:
        val run_id, emit: run_id
        val group_id, emit: group_id
        val sample_id, emit: sample_id
    
    script:
        """
        python ${projectDir}/bin/run-tile-images.py \
            --image ${image} \
            --masks ${masks} 
        """

}

This calls a Python script that ends like this

i = 0
for wsi_patch, mask_patch in zip(wsi_patch_extractor, mask_patch_extractor):
    np.save(f"{args.prefix}_wsi_{i}.npy", wsi_patch)
    np.save(f"{args.prefix}_mask_{i}.npy", mask_patch)
    i += 1

Rather than saving and passing on one file, this saves roughly 40 file pairs (not the same number each time). How can I write the Nextflow process such that each of these pairs is a new entry in the output queue?

I.e. the output of a single process would in fact be lots of outputs each containing the run_id, group_id and sample_id as well as one of the wsi/mask pairs that are saved out?


Solution

  • One approach would be to group the files by their suffix and then use the key to collect the file pairs. Note that we can use the flatMap operator to emit the items in the collection separately. For example:

    process test {
    
        input:
        val sample_id
    
        output:
        tuple val(sample_id), path("${sample_id}_{wsi,mask}_*.npy")
    
        """
        for i in {1..3} ; do
            touch "${sample_id}_wsi_\${i}.npy"
            touch "${sample_id}_mask_\${i}.npy"
        done
        """
    }
    
    workflow {
    
        samples = Channel.of( 'foo', 'bar', 'baz' )
    
        test( samples )
    
        test.out.flatMap { sample, files ->
          files
            .groupBy { it.baseName.split('_',)[-1] }
            .collect { key, values ->
                def mask = values.find { it.baseName.endsWith("_mask_${key}") }
                def wsi = values.find { it.baseName.endsWith("_wsi_${key}") }
    
                tuple( sample, mask, wsi )
            }
        }.view()
    }
    

    Results:

    $ nextflow run main.nf 
    N E X T F L O W  ~  version 23.04.1
    Launching `main.nf` [scruffy_feynman] DSL2 - revision: 8d78366a9a
    executor >  local (3)
    [d4/0acae5] process > test (3) [100%] 3 of 3 ✔
    [bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_1.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_1.npy]
    [bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_2.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_2.npy]
    [bar, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_mask_3.npy, /path/to/work/c5/228fa4aaf6e65675b93158397d4657/bar_wsi_3.npy]
    [foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_1.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_1.npy]
    [foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_2.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_2.npy]
    [foo, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_mask_3.npy, /path/to/work/89/d26cde15264689c2194027b59042c3/foo_wsi_3.npy]
    [baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_1.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_1.npy]
    [baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_2.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_2.npy]
    [baz, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_mask_3.npy, /path/to/work/d4/0acae55ebb3ef87ab49d12ae988041/baz_wsi_3.npy]