In the following code, tick
emits a new object every three seconds. I'm trying to count the number of emitted objects every second using groupedWithin
(which ignores empty groups). Is there any way in Akka Streams for the following code to print 0
in periods when tick
does not emit any objects?
Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
.groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
.map(List::size)
.runWith(Sink.foreach(e -> System.out.println(e)), materializer);
In other words, I'd like the output of this code to be this sequence: 1 0 0 1 0 0 1 ...
(every second) instead of 1 1 1 ...
(every three seconds).
EDIT: This is the best workaround I have come up with so far (using keepAlive
to send some special objects if the upstream is idle):
Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
.keepAlive(Duration.ofSeconds(1), KeepAliveElement::new)
.groupedWithin(Integer.MAX_VALUE, Duration.ofSeconds(1))
.map(lst -> lst.stream().filter(e -> !(e instanceof KeepAliveElement)).collect(Collectors.toList()))
.map(List::size)
.runWith(Sink.foreach(e -> System.out.println(e)), materializer);
Is there any better way to do this?
I thought this would be of normal difficulty, I was wrong. One thing I wanted to do is to ensure that the flow counting items that pass through the stream does not keep a reference to each item it sees: if many items pass in the aggregation period, you will end up with an unnecessarily big list in memory (even if only for a second) and the performance penalty to add (many) items to it. The following solution, although complex, keeps only a counter.
NOTE: Although I tested the happy scenario, I cannot say this is battle-proven, so use with caution!
Based on Akka's GroupedWeightedWithin
and the documentation here:
public class CountInPeriod<T> extends GraphStage<FlowShape<T, Integer>> {
public Inlet<T> in = Inlet.<T>create("CountInPeriod.in");
public Outlet<Integer> out = Outlet.<Integer>create("CountInPeriod.out");
private FlowShape<T, Integer> shape = FlowShape.of(in, out);
private Duration duration;
public CountInPeriod(Duration duration) {
this.duration = duration;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new TimerGraphStageLogic(shape) {
private int counter = 0;
private int bufferPushCounter = -1;
{
setHandler(in, new AbstractInHandler() {
@Override public void onPush() throws Exception, Exception {
grab(in);
counter++;
pull(in);
}
});
setHandler(out, new AbstractOutHandler() {
@Override public void onPull() throws Exception, Exception {
if (bufferPushCounter >= 0) {
push(out, bufferPushCounter);
bufferPushCounter = -1;
}
}
});
}
@Override
public void preStart() throws Exception, Exception {
scheduleWithFixedDelay(CountInPeriod.class, duration, duration);
pull(in);
}
@Override
public void onTimer(Object timerKey) throws Exception, Exception {
if (isAvailable(out)) emitCounter();
else bufferPush();
}
private void emitCounter() {
push(out, counter);
counter = 0;
bufferPushCounter = -1;
}
private void bufferPush() {
bufferPushCounter = counter;
counter = 0;
}
};
}
@Override
public FlowShape<T, Integer> shape() {
return shape;
}
}
Test code:
public class GroupTicked {
final static ActorSystem as = ActorSystem.create("as");
public static void main(String... args) throws Exception {
CompletionStage<Done> done = Source.tick(Duration.ZERO, Duration.ofSeconds(3), new Object())
.take(7) // to finish in finite time...
.via(new CountInPeriod<>(Duration.ofSeconds(1)))
.runWith(Sink.foreach(e -> System.out.println(System.currentTimeMillis() + " -> " + e)), as);
done.thenAccept(x -> as.terminate());
}
}