Search code examples
javajavafxreactfx

How to reset the last accumulation value in EventStream?


I have a merged EventStream from N observable values. From this values I want the smallest one. For example:

    Var<Integer> a = Var.newSimpleVar(2);
    Var<Integer> b = Var.newSimpleVar(3);
    Var<Integer> c = Var.newSimpleVar(4);
    ...

    EventStream<Integer> m = EventStreams.merge(a.values(), b.values(), c.values(), ...);

    m = m.filter(integer -> integer > 1).accumulate((i1, i2) -> Math.min(i1, i2));
    m.subscribe(integer -> System.out.println(integer));


    a.setValue(0); // At this point the Input is filtered and not emitted also I dont want the last value "2" in the accumulation. 
    b.setValue(5);
    c.setValue(3);
    //Output is always "2".

My problem is that I want after the first filtered value also a new init value for accumulate. In this case for example something like "Integer.MAX_VALUE".

So that the next compare in accumulate isn't:
"Math.min(2,5)" -> "Math.min(2,3)"
but
"Math.min(MAX_VALUE,5)" -> "Math.min(5,3)".

So the Output shouldn't be:
2, 2, 2, 2, 2
but

a -> 2 : Output minimum 2
b -> 3 : Output minimum 2
c -> 4 : Output minimum 2

a -> 0 : OK condition (value < 1) is true. Now reset or better repeat the stream (without holding the last value 2 in accumulation)

b -> 5 : Output minimum 5
c -> 3 : Output minimum 3
a -> 4 : Output minimum 3
...


Solution

  • OK I found a solution. Ignoring the zeros was a really good hint. Thanks Tomas :)

        Var<Integer> a = Var.newSimpleVar(2);
        Var<Integer> b = Var.newSimpleVar(3);
        Var<Integer> c = Var.newSimpleVar(4);
        ...
    
        EventStream<Integer> m = EventStreams.merge(a.values(), b.values(), c.values(), ...);
        m = m.accumulate((i1, i2) -> i2 < 1 ? Integer.MAX_VALUE : Math.min(i1, i2)).filter(integer -> integer != Integer.MAX_VALUE);
        m.subscribe(integer -> System.out.println(integer));
    
        a.setValue(0);
        b.setValue(5);
        c.setValue(3);
        a.setValue(4);
    

    Output is:
    a -> 2 : Output minimum 2
    b -> 3 : Output minimum 2
    c -> 4 : Output minimum 2

    a -> 0 : No Output

    b -> 5 : Output minimum 5
    c -> 3 : Output minimum 3
    a -> 4 : Output minimum 3

    So the problem was that I can not filter before the accumulation executes (in this case). There are also some issues for example if the first value in this stream is a zero (a fix would look like ... (i1, i2) -> i1 < 1 ? i2 : i2 < 1 ? Integer.MAX_VALUE ...). But anyway in my case or similar ones this kind of solution work or should work ;)