Update : I put my question in test project to explain what I mean in detail
=====================================================================
I have Akka source that contiune read from database table, and groupby some key then reduce it. However it seems after I apply reduce function, the data never send to sink, it will contiune reduce since upstream always have data coming.
I read some post, and tried groupedWithin and sliding, but it does not work as I thought, it only group the message to larger part but never make the upstream pause and emit to sink. Following is the code in Akka stream 2.5.2
The Source reduce code:
source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();
The Sink and run:
Sink<Object, CompletionStage<Done>> sink =
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);
I have also tried .takeWhile(predicated); I use timer to switch predicated value true and false, but it seems it will only take the first switch to false, when I switch back to true it is not restart upstream.
Please help me thanks in advance!
=================================================
Update for
information about the type of elements
Add what I want:
I have class call SystemCodeTracking
contains 2 attributes (id, entityName)
I will have list of object: (1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")
I would like to groupBy entityName then sum the id , therefore, the result I would like to see is following
("table1" 1+4),("table3", 3+5),("table2", 2)
The code I am doing now is following
source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)
my question right now is more on how to build scan inital state should I do ?
scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))
So what you want, if I understand everything well is:
systemCodeTracking.getId()
For the first part, you'll need groupBy
. For the second part groupedWithin
. However, they do not work the same: the first one will give you subflows, while the second one will give you a flow of lists.
Therefore, we will have to handle them differently.
First, let's write a reducer for your lists:
private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
if (list.isEmpty()) {
throw new Exception();
} else {
SystemCodeTracking building = list.get(0);
building.setId(0L);
list.forEach(next -> building.setId(building.getId() + next.getId()));
return building;
}
}
So for each element in the list, we increment the building.id
to get the value we want when the whole list has been traversed.
Now you just need to do
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
.groupBy(20000, SystemCodeTracking::getEntityName) // group by name
.groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100)
.filterNot(List::isEmpty) // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
.map(this::reduceList) // reduce each list to sum the ids
.log("====== doing reduceing ") // log each passing element using akka logger, rather than `System.out.println`
.mergeSubstreams() // merge back all elements with different names