Search code examples
rgroovynextflow

Nextflow does not create the number of expected output files


I have the following example Nextflow pipeline, which I am using to A) preprocess a matrix and B) do some calculation with that matrix and a second matrix. I am doing the preprocessing with several replicates. In the case that I would create 5 replicates, I would expect to have 5 output files from process A, feed them into process B and then again get 5 result files from process B. However, I only get a single output file from process B and it seems like its a random replicate each time. Here is the example command line output of the scripts below (note how process B only runs one single time and not 5 times as I would expect):

Launching `test.nf` [hungry_brenner] DSL2 - revision: 51e005477c
executor >  local (6)
[c4/ef0279] process > A (3) [100%] 5 of 5 ✔
[67/32ce50] process > B (1) [100%] 1 of 1 ✔
[dataset_1, dataset_2, 4, method_X]

Can someone see what I am missing here?

nextflow script:

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.replicates = 5

process A {
    input: 
    tuple path(mat), val(ds)
    each replicate
    path preProcess_dir
    
    output:
    tuple path("${preProcess_dir}/${ds}_${replicate}_subset.rds"),
          val(ds),
          val(replicate)
    
    shell:
    '''
    scriptA.R '!{mat}' '!{ds}' '!{replicate}' '!{preProcess_dir}'
    '''
}


process B {
  input:
  tuple path(A_matrix), val(ds), val(replicate)
  tuple path(mat2), val(ds2) 
  each method
  path results_dir
  
  output:
  tuple val(ds),
        val(ds2),
        val(replicate),
        val(method)
  
  
  shell:
  '''
  scriptB.R '!{A_matrix}' '!{ds}' '!{mat2}' '!{ds2}' '!{replicate}' '!{method}' '!{results_dir}'
  '''
  
}


workflow{
  inputA = Channel.of(['/path/to/matrix_A.rds','dataset_1'])
  replicates = (1..params.replicates).toList()
  out_A = A(inputA, replicates, '/path/to/preprocess_results/')
  
  input_B = Channel.of(['/path/to/matrix_B.rds','dataset_2'])
  out_B = B(out_A, input_B, 'method_X', '/path/to/final_results/')
  
  out_B.view()
}

scriptA.R:

#!/usr/bin/Rscript

library(docopt)

"Usage:
  scriptA.R <mat> <ds> <replicate> <outdir> 
Options:
<mat> path to  matrix
<ds> name of dataset
<replicate> value of replicate number
<outdir> output directory"  -> doc

args <- docopt::docopt(doc)

mat <- readRDS(args$mat)
subset_mat <- mat[sample(1:nrow(mat), 20),] # sample 20 random rows

out_path <- paste0(args$outdir,'/',args$ds, '_', args$replicate)
saveRDS(subset_mat, paste0(out_path,'_subset.rds'))

scriptB.R

#!/usr/bin/Rscript

library(docopt)

"Usage:
  scriptB.R <mat> <ds> <mat2> <ds2> <replicate> <method> <outdir> 
Options:
<mat> path to  matrix
<ds> name of dataset
<mat2> path to  matrix2
<ds2> name of dataset2
<replicate> value of replicate number
<method> function to be applied to matrix 1 and 2
<outdir> output directory"  -> doc

args <- docopt::docopt(doc)

mat <- readRDS(args$mat)
mat2 <- readRDS(args$mat2)

if(args$method == 'method_X'){
  res <- sum(mat) + sum(mat2)  # do something with the 2 matrices
}

out_path <- paste0(args$outdir,'/',args$method, '_', args$ds, '_', args$ds2, '_', args$replicate)
saveRDS(res, paste0(out_path,'_output.rds'))

Solution

  • This usually occurs when a process erroneously receives two or more queue channels. Most of the time, what you want is one queue channel and one or more value channels when you require multiple input channels.

    In this case, both out_A and input_B are queue channels. Note that input_B is a queue channel that emits a single list item. You can instead use the value factory method or simply pass in a list object. The latter works because a value channel is implicitly created by a process when it is invoked with a simple value1.

    Also, be sure to avoid absolute paths in your workflow script. For your workflow to be portable, processes should only access files in the working directory (i.e. ./work). Perhaps someting like:

    params.datasets_csv = './datasets.csv'
    params.replicates = 5
    
    params.outdir = './results'
    
    process proc_A {
    
        publishDir "${params.outdir}/preprocessing", mode: 'copy'
    
        input:
        tuple val(ds), path(matrix)
        each replicate
    
        output:
        tuple val(replicate), val(ds), path("${ds}_${replicate}_subset.rds")
    
        script:
        """
        scriptA.R \\
            "${matrix}" \\
            "${ds}" \\
            "${replicate}" \\
            ./
        """
    }
    
    process proc_B {
    
        publishDir "${params.outdir}/final", mode: 'copy'
    
        input:
        tuple val(replicate), val(ds1), path(A_matrix), val(ds2), path(B_matrix)
        each method
    
        output:
        path "${method}_${ds1}_${ds2}_${replicate}_output.rds"
    
        script:
        """
        scriptB.R \\
            "${A_matrix}" \\
            "${ds1}" \\
            "${B_matrix}" \\
            "${ds2}" \\
            "${replicate}" \\
            "${method}" \\
            ./
        """
    }
    
    workflow {
    
        replicates = (1..params.replicates).toList()
        methods = [ 'method_X', 'method_Y', 'method_Z' ]
    
        Channel
            .fromPath( params.datasets_csv )
            .splitCsv( header: true )
            .branch { row ->
                def (id, num) = row.dataset.split('\\.', 2)
    
                input_A: id == "dataset_1"
                    return tuple( row.dataset, file(row.matrix) )
                input_B: id == "dataset_2"
                    return tuple( row.dataset, file(row.matrix) )
            }
            .set { datasets }
    
        out_A = proc_A( datasets.input_A, replicates )
        out_B = proc_B( out_A.combine(datasets.input_B), methods )
    

    Example contents of datasets.csv:

    dataset,matrix
    dataset_1.1,/path/to/matrix_A1.rds
    dataset_1.2,/path/to/matrix_A2.rds
    dataset_2.1,/path/to/matrix_B1.rds
    dataset_2.2,/path/to/matrix_B2.rds
    dataset_2.3,/path/to/matrix_B3.rds
    

    Results:

    $ nextflow run main.nf 
    N E X T F L O W  ~  version 22.04.0
    Launching `main.nf` [drunk_venter] DSL2 - revision: e7d33025b0
    executor >  local (100)
    [ed/f85b29] process > proc_A (10) [100%] 10 of 10 ✔
    [10/645eb4] process > proc_B (90) [100%] 90 of 90 ✔