Using NextFlow (DSL=2), I would like to use each line of a file as a streaming input of my workflow.
nextflow.enable.dsl=2
process bar {
input: val data
output: val result
exec:
result = data.toUpperCase()
}
workflow {
myFile = file('input_test.txt')
myReader = myFile.newReader()
myFile.withInputStream {
String line
while( line = myReader.readLine() ) {
channel.from(line) | bar | view
}
}
}
I face the problem that I can only use the "bar" process once:
Process 'bar' has been already used -- If you need to reuse the same component include it with a different name or include in a different workflow context
I have also tried to create a subworkflow that takes the channel from line and call bar.
Is there a way to use streamed data as an input using Nextflow?
Note: my final goal is not just to apply an upper case function. I would like to link several complex processes on the data stream.
Thank you!
Your example code looks like an anti-pattern - it will try to create a new channel for each line in your input file. Instead, have a look at the splitting operators, especially the splitText operator:
workflow {
Channel.fromPath('input_test.txt')
| splitText { it.trim() } \
| bar \
| view()
}
If the above doesn't help, please describe exactly what you want to do with each line in your input file.