I have a nextflow process that takes an input tuple from a channel and uses this input to submit n jobs to my computing cluster. This works fine when using CPUs as they are slow but plentiful and I can parallelise across many CPUs at one.
However I can run this process in a fraction of the time with GPUs, this is so much quicker that when I do run it the rate-limiting step is waiting for another GPU to become available as my job finishes near immediately and I am pushed back in the queue. As a result it makes more sense to convert this process that was previously parallelise to one that uses a for loop and takes much longer to finish but completes produces all my required output in one go.
How do I convert this process to one that iterates through with ideally no change in the structure of the input or output so that changing between the two modes can be a simple parameter and have no effect for the enduser.
I got as far as below but I don't think it will work, any suggestions?
process PROCESS_CPU{
label 'process_medium'
tag {condition}
publishDir path: "${params.outdir}/OUTPUT/${condition}",
overwrite: "true"
input:
tuple(val(condition),path(file),path(another_file))
output:
path("output1.tsv"), emit: out1
path("output2.tsv"), emit: out2
path('outdir3')
script:
"""
myscript.py --file1 ${file} --file2 ${another_file} -param ${params.myparam} -o './'
"""
}
process PROCESS_GPU{
label 'gpu'
input:
input_list
output:
path("*/output1.tsv"), emit: out1
path("*/output2.tsv"), emit: out2
path('*/outdir3')
script:
"""
for input_tuple in ${input_list}
do
myscript.py --file1 ${input_tuple[1]} --file2 ${input_tuple[2]]} -param ${params.myparam} -o ${input_tuple[0]}
done
"""
}
workflow {
input_list = [
('condition1', 'path/to/file1.1', 'path/to/file2.1'),
('condition2', 'path/to/file1.2', 'path/to/file2.2'),
# add more input tuples as needed
]
results = PROCESS_GPU(input_list).collect()
}
One way would be to collect, transpose, and collect the inputs required to run the task(s). In this configuration they're collections, but the val
qualifier can accept any data type, and, path
inputs can also accept a collection of files1. We can then transpose these again and embed them into a here-doc that writes to a file. Or, like in the example below, just read from the doc directly using a while-loop:
params.myparam = 'foo'
process DO_WORK {
input:
tuple(
val(condition),
path(file1, stageAs: 'file1_dir/*'),
path(file2, stageAs: 'file2_dir/*'),
)
output:
tuple val(condition), path("{${condition.join(',')}}.txt")
script:
def file1_files = file1 instanceof Path ? [file1] : file1
def file2_files = file2 instanceof Path ? [file2] : file2
def input_records = [condition, file1_files, file2_files]
.transpose()
.collect { it.join(',') }
"""
| while IFS=',' read -r condition file1 file2
| do
| myscript.py \\
| --file1 "\${file1}" \\
| --file2 "\${file2}" \\
| --param "${params.myparam}" \\
| -o "\${condition}.txt"
|
| done << __EOF__
| ${input_records.join('\n' + '|' + ' ')}
| __EOF__
""".stripMargin()
}
workflow {
input_list = [
['condition1', file('./path/to/file1.1'), file('./path/to/file2.1')],
['condition2', file('./path/to/file1.2'), file('./path/to/file2.2')],
['condition3', file('./path/to/file1.3'), file('./path/to/file2.3')],
]
Channel
.fromList( input_list )
.collect(flat: false)
.transpose()
.collect(flat: false)
.set { input_ch }
results = DO_WORK( input_ch )
results.transpose().view()
}
Results:
$ nextflow run main.nf
N E X T F L O W ~ version 23.04.0
Launching `main.nf` [mad_easley] DSL2 - revision: a3d7bd5cc2
executor > local (1)
[52/c8da98] process > DO_WORK [100%] 1 of 1 ✔
[condition1, /path/to/work/52/c8da98b7e747e71b7e5dd88e1b7648/condition1.txt]
[condition2, /path/to/work/52/c8da98b7e747e71b7e5dd88e1b7648/condition2.txt]
[condition3, /path/to/work/52/c8da98b7e747e71b7e5dd88e1b7648/condition3.txt]
To ensure each line is properly embedded in the here-doc, we should ensure each line uses the same indentation used to indent our script block. To avoid specifying the amount of indentation (which would be four spaces in the example above), one way around this is to call stripMargin()
to strip all leading whitespace followed by a pipe character from every line in the script block. I think this leads to the most readable/maintainable solution, but please comment below if you find a better approach.