Search code examples
javastreamingakkaakka-stream

Akka stream sliding window to control reduce emit to sink by SourceQueue


Update : I put my question in test project to explain what I mean in detail

=====================================================================

I have Akka source that contiune read from database table, and groupby some key then reduce it. However it seems after I apply reduce function, the data never send to sink, it will contiune reduce since upstream always have data coming.

I read some post, and tried groupedWithin and sliding, but it does not work as I thought, it only group the message to larger part but never make the upstream pause and emit to sink. Following is the code in Akka stream 2.5.2

The Source reduce code:

source = source
  .groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
  .sliding(3, 1)
  .mapConcat(i -> i)
  .mapConcat(i -> i)
  .groupBy(2000000, i -> i.getEntityName())
  .map(i -> new Pair<>(i.getEntityName(), i))
  .reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
  .map(i -> i.second())
  .mergeSubstreams();

The Sink and run:

Sink<Object, CompletionStage<Done>> sink = 
        Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);

I have also tried .takeWhile(predicated); I use timer to switch predicated value true and false, but it seems it will only take the first switch to false, when I switch back to true it is not restart upstream.

Please help me thanks in advance!

=================================================

Update for

information about the type of elements

Add what I want: I have class call SystemCodeTracking contains 2 attributes (id, entityName)

I will have list of object: (1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")

I would like to groupBy entityName then sum the id , therefore, the result I would like to see is following

("table1" 1+4),("table3", 3+5),("table2", 2)

The code I am doing now is following

source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)

my question right now is more on how to build scan inital state should I do ?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))

Solution

  • So what you want, if I understand everything well is:

    • first, group by id
    • then group by time window, and inside this time window, sum all the systemCodeTracking.getId()

    For the first part, you'll need groupBy. For the second part groupedWithin. However, they do not work the same: the first one will give you subflows, while the second one will give you a flow of lists.

    Therefore, we will have to handle them differently.

    First, let's write a reducer for your lists:

    private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
        if (list.isEmpty()) {
            throw new Exception();
        } else {
            SystemCodeTracking building = list.get(0);
            building.setId(0L);
            list.forEach(next -> building.setId(building.getId() + next.getId()));
            return building;
        }
    }
    

    So for each element in the list, we increment the building.id to get the value we want when the whole list has been traversed.

    Now you just need to do

    Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
        .groupBy(20000, SystemCodeTracking::getEntityName) // group by name
        .groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS)   // for a given name, group by time window (or by packs of 100)
        .filterNot(List::isEmpty)                          // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
        .map(this::reduceList)                             // reduce each list to sum the ids
        .log("====== doing reduceing ")                    // log each passing element using akka logger, rather than `System.out.println`
        .mergeSubstreams()                                 // merge back all elements with different names