Search code examples
groovygparsdataflowtask

dataflowQueues processing or pipelines only seems to be triggered on df.val request


Looking at Gpars dataflows/pipelines but there is something I don't understand

If you look at the example below (I've done this with operators, piplines, chainWith, and hit the same problem).

In this example I've used tasks but could just as easily be without and same problem manifests. In this example I set up two DataflowQueues, one for initial conditions, and one for results of evaluating against a predicate. I then layout a pipeline that evaluates the inputs against the inputs against the predicate (is even test) and stores results in output results queue

Having setup the pipeline and posted some entries into the first queue I believed that the entries would be processed as data was available (this didn't work for operator version either), as you can see I test the size of the resultQ its zero ( if I remove the task that's still true) after I write the entries to the sessionQ. So writing data doesn't 'trigger' the processing.

The first task save a number of entries to a queue.

import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise

/**
 * Created by will on 13/01/2017.
 */

def iValues = [1,2,3,4,5]

DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()

Dataflow.task {
    println "setup task: set initial conditions list for rule predicate "
    iValues.each {sessionQ << it}
}

Closure evenPredicate = {it %2 == 0}

//layout pipeline 
sessionQ | evenPredicate   | resultQ

assert resultQ.iterator().size() == 0

Promise ans =  Dataflow.task {
    println "result task : get three values from result q "
    def outlist = []
    3.times {
        def res = resultQ.val
        println "got result $res"
        outlist << res
    }
    assert sessionQ.iterator().size() == 0
    assert resultQ.iterator().size() == 2
    outlist
}

println "ans list is $ans.val"
assert resultQ.iterator().size() == 2

its only in the second task/chainWith etc. - where you invoke a .val (or get()) on the second queue that the engine starts to run and ALL the entries are processed from the first queue and results bound to the resultQ.

You can see this from the assertions, as once the first trigger (.val) sync calls is made the engine runs and processes ALL the bound entries in the starting sessionQ.

This is a problem, as until you run that first .val call -if you do a poll(), or resultQ.interator.size() for example it is empty and unbound, size()=0. So you can't write

for (dfRes in resultQ) {//do something with dfRes} 

As its always empty, until you consume the first item from the sessionQ. I don't understand why? After entries are bound to the first dataflowQueue I thought the items would be consumed AS they became available (are bound) - but they are not.

This is now tricky as you can't get the entries through, check the size of the results, do a poll(), on resultQ as it will fail until that first DF from sessionQ has been read.

I've ended up having to use the size of the initial values array (tells me the entries saved to the queue) as the ONLY means to read the same number back off the resultQ to empty it ( in the above I have only consumed 3 records from the resultsQ and the assertion shows that that there are 2 records still left in the resultQ (but only after that first .val call is made, if you comment that line out all the assertions start to fail)

I tried this with Dataflow.operator, Pipeline, etc. and get same problem. Why doesn't the work get processed as each input is bound to the SessionQ?

lastly in the case of Pipeline, there's a .complete() method, which if you processes a closure {} in the pipeline, stays open (!complete()), but when you run a method like .binaryChoice () it marks the Pipeline as complete and no further actions can added. Why is this done?

I don't understand what that state is saying (no more processing will occur) of course and an exception will be thrown if you try and tack another step after such a method.

Either way - I tried pipeline line like this

Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }

However when you bind values to Q nothing happens - until you consume an output like

odd.val

When all of a sudden the Pipeline 'runs' and processes ALL the DF items stored in Q.

Nothing I've tried kickstarts the scheduling of the work - other than the first .val consumption

Can any explain why this is, I must to missing the point here, but this 'do nothing' until the first entry is read is NOT what I was expecting and invalidates any size assessment (.iterator.size(), poll() etc.) type call on a DataflowWriteChannel target.

I'd Appreciate any help on this - I have struggled with this for two days now and got nowhere. I looked at all the Gpars tests as well, and they just call .val the same number of times as inputs are bound - so don't show the problem I've described.


Solution

  • A small modification (adding a delay) just before you assert that the size is 0 will reveal that the computation is triggered by the written data:

    //layout pipeline
    sessionQ | evenPredicate   | resultQ
    sleep 5000
    assert resultQ.iterator().size() == 0