I have the following example Nextflow pipeline, which I am using to A) preprocess a matrix and B) do some calculation with that matrix and a second matrix. I am doing the preprocessing with several replicates. In the case that I would create 5 replicates, I would expect to have 5 output files from process A, feed them into process B and then again get 5 result files from process B. However, I only get a single output file from process B and it seems like its a random replicate each time. Here is the example command line output of the scripts below (note how process B only runs one single time and not 5 times as I would expect):
Launching `test.nf` [hungry_brenner] DSL2 - revision: 51e005477c
executor > local (6)
[c4/ef0279] process > A (3) [100%] 5 of 5 ✔
[67/32ce50] process > B (1) [100%] 1 of 1 ✔
[dataset_1, dataset_2, 4, method_X]
Can someone see what I am missing here?
nextflow script:
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
params.replicates = 5
process A {
input:
tuple path(mat), val(ds)
each replicate
path preProcess_dir
output:
tuple path("${preProcess_dir}/${ds}_${replicate}_subset.rds"),
val(ds),
val(replicate)
shell:
'''
scriptA.R '!{mat}' '!{ds}' '!{replicate}' '!{preProcess_dir}'
'''
}
process B {
input:
tuple path(A_matrix), val(ds), val(replicate)
tuple path(mat2), val(ds2)
each method
path results_dir
output:
tuple val(ds),
val(ds2),
val(replicate),
val(method)
shell:
'''
scriptB.R '!{A_matrix}' '!{ds}' '!{mat2}' '!{ds2}' '!{replicate}' '!{method}' '!{results_dir}'
'''
}
workflow{
inputA = Channel.of(['/path/to/matrix_A.rds','dataset_1'])
replicates = (1..params.replicates).toList()
out_A = A(inputA, replicates, '/path/to/preprocess_results/')
input_B = Channel.of(['/path/to/matrix_B.rds','dataset_2'])
out_B = B(out_A, input_B, 'method_X', '/path/to/final_results/')
out_B.view()
}
scriptA.R:
#!/usr/bin/Rscript
library(docopt)
"Usage:
scriptA.R <mat> <ds> <replicate> <outdir>
Options:
<mat> path to matrix
<ds> name of dataset
<replicate> value of replicate number
<outdir> output directory" -> doc
args <- docopt::docopt(doc)
mat <- readRDS(args$mat)
subset_mat <- mat[sample(1:nrow(mat), 20),] # sample 20 random rows
out_path <- paste0(args$outdir,'/',args$ds, '_', args$replicate)
saveRDS(subset_mat, paste0(out_path,'_subset.rds'))
scriptB.R
#!/usr/bin/Rscript
library(docopt)
"Usage:
scriptB.R <mat> <ds> <mat2> <ds2> <replicate> <method> <outdir>
Options:
<mat> path to matrix
<ds> name of dataset
<mat2> path to matrix2
<ds2> name of dataset2
<replicate> value of replicate number
<method> function to be applied to matrix 1 and 2
<outdir> output directory" -> doc
args <- docopt::docopt(doc)
mat <- readRDS(args$mat)
mat2 <- readRDS(args$mat2)
if(args$method == 'method_X'){
res <- sum(mat) + sum(mat2) # do something with the 2 matrices
}
out_path <- paste0(args$outdir,'/',args$method, '_', args$ds, '_', args$ds2, '_', args$replicate)
saveRDS(res, paste0(out_path,'_output.rds'))
This usually occurs when a process erroneously receives two or more queue channels. Most of the time, what you want is one queue channel and one or more value channels when you require multiple input channels.
In this case, both out_A
and input_B
are queue channels. Note that input_B
is a queue channel that emits a single list item. You can instead use the value
factory method or simply pass in a list object. The latter works because a value channel is implicitly created by a process when it is invoked with a simple value1.
Also, be sure to avoid absolute paths in your workflow script. For your workflow to be portable, processes should only access files in the working directory (i.e. ./work
). Perhaps someting like:
params.datasets_csv = './datasets.csv'
params.replicates = 5
params.outdir = './results'
process proc_A {
publishDir "${params.outdir}/preprocessing", mode: 'copy'
input:
tuple val(ds), path(matrix)
each replicate
output:
tuple val(replicate), val(ds), path("${ds}_${replicate}_subset.rds")
script:
"""
scriptA.R \\
"${matrix}" \\
"${ds}" \\
"${replicate}" \\
./
"""
}
process proc_B {
publishDir "${params.outdir}/final", mode: 'copy'
input:
tuple val(replicate), val(ds1), path(A_matrix), val(ds2), path(B_matrix)
each method
output:
path "${method}_${ds1}_${ds2}_${replicate}_output.rds"
script:
"""
scriptB.R \\
"${A_matrix}" \\
"${ds1}" \\
"${B_matrix}" \\
"${ds2}" \\
"${replicate}" \\
"${method}" \\
./
"""
}
workflow {
replicates = (1..params.replicates).toList()
methods = [ 'method_X', 'method_Y', 'method_Z' ]
Channel
.fromPath( params.datasets_csv )
.splitCsv( header: true )
.branch { row ->
def (id, num) = row.dataset.split('\\.', 2)
input_A: id == "dataset_1"
return tuple( row.dataset, file(row.matrix) )
input_B: id == "dataset_2"
return tuple( row.dataset, file(row.matrix) )
}
.set { datasets }
out_A = proc_A( datasets.input_A, replicates )
out_B = proc_B( out_A.combine(datasets.input_B), methods )
Example contents of datasets.csv:
dataset,matrix
dataset_1.1,/path/to/matrix_A1.rds
dataset_1.2,/path/to/matrix_A2.rds
dataset_2.1,/path/to/matrix_B1.rds
dataset_2.2,/path/to/matrix_B2.rds
dataset_2.3,/path/to/matrix_B3.rds
Results:
$ nextflow run main.nf
N E X T F L O W ~ version 22.04.0
Launching `main.nf` [drunk_venter] DSL2 - revision: e7d33025b0
executor > local (100)
[ed/f85b29] process > proc_A (10) [100%] 10 of 10 ✔
[10/645eb4] process > proc_B (90) [100%] 90 of 90 ✔