Search code examples
python-3.xfeedbacknextflow

Is it possible to have an output feedback to an input in a single nextflow process?


I am trying to make a simple feedback loop in my nextflow script. I am getting a weird error message that I do not know how to debug. My attempt is modeled after the NextFlow design pattern described here. I need a value to be calculated from a python3 script that operates on an image but pass that value on to subsequent executions of the script. At this stage I just want to get the structure right by adding numbers but I cannot get that to work yet.

my script


feedback_ch = Channel.create()
input_ch = Channel.from(feedback_ch)

process test {

    echo true 

    input:
        val chan_a from Channel.from(1,2,3)
        val feedback_val from input_ch
 
    output:
        stdout output_val into feedback_ch

    shell:
    '''
    #!/usr/bin/python3

    new_val = !{chan_a} + !{feedback_val}
    print(new_val)

    
    '''
}

The error message I get

Error executing process > 'test (1)'

Caused by:
  Process `test (1)` terminated with an error exit status (1)

Command executed:

  #!/usr/bin/python3
  
  new_val = 1 + DataflowQueue(queue=[])
  print(new_val)

Command exit status:
  1

Command output:
  (empty)

Command error:
  Traceback (most recent call last):
    File ".command.sh", line 3, in <module>
      new_val = 1 + DataflowQueue(queue=[])
  NameError: name 'DataflowQueue' is not defined

Work dir:
executor >  local (1)
[cd/67768e] process > test (1) [100%] 1 of 1, failed: 1 ✘
Error executing process > 'test (1)'

Caused by:
  Process `test (1)` terminated with an error exit status (1)

Command executed:

  #!/usr/bin/python3
  
  new_val = 1 + DataflowQueue(queue=[])
  print(new_val)

Command exit status:
  1

Command output:
  (empty)

Command error:
  Traceback (most recent call last):
    File ".command.sh", line 3, in <module>
      new_val = 1 + DataflowQueue(queue=[])
  NameError: name 'DataflowQueue' is not defined

Work dir:
  /home/cv_proj/work/cd/67768e706f50d7675ae93645a0ce6e

Tip: you can replicate the issue by changing to the process work dir and entering the command `bash .command.run`

Anyone have any ideas?


Solution

  • The problem you have says, that you are passing empty DataflowQueue object with input_ch. Nextflow tries to execute it, so it substitutes your python code with variables, resulting in:

      #!/usr/bin/python3
      
      new_val = 1 + DataflowQueue(queue=[])
      print(new_val)
    

    What is nonsense (You want some number instead of DataflowQueue(queue=[]), don't you?).
    Second problem is, that you don't have channels mixed, what seems to be important in this pattern. Anyway, I fixed it, to have proof of concept, working solution:

    condition = { it.trim().toInteger() > 10 }  // As your output is stdout, you need to trim() to get rid of newline. Then cast to Integer to compare.
    feedback_ch = Channel.create()
    input_ch = Channel.from(1,2,3).mix( feedback_ch.until(condition) )  // Mixing channel, so we have feedback
    
    process test {
    
        input:
        val chan_a from input_ch
    
        output:
        stdout output_val into feedback_ch
    
        shell:
        var output_val_trimmed = chan_a.toString().trim()
        // I am using double quotes, so nextflow interpolates variable above.
        """
        #!/usr/bin/python3
    
        new_val = ${output_val_trimmed} + ${output_val_trimmed}
        print(new_val)
        """
    
    }
    

    I hope, that it at least set you on right track :)