I'm just starting to learn flink and trying to build a very basic toy example which sums an integer over time and periodically prints the total sum so far
I've created a random number generator source class like this:
// RandomNumberSource.java
public class RandomNumberSource implements SourceFunction<Integer> {
public volatile boolean isRunning = true;
private Random rand;
public RandomNumberSource() {
this.rand = new Random();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (isRunning) {
ctx.collect(rand.nextInt(200));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
As you can see, it generates a random number every 1 second
Now how would I go about summing the number that's being generated?
// StreamJob.java
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> randomNumber = env.addSource(new RandomNumberSource());
// pseudo code:
// randomNumber
// .window(Time.seconds(5))
// .reduce(0, (acc, i) => acc+i) // (initial value, reducer)
// .sum()
// execute program
env.execute("Flink Streaming Random Number Sum Aggregation");
}
}
I've added pseudo code to explain what I'm trying to do. i.e every 5 seconds, perform a sum of all the numbers and print it out.
I feel like I'm missing something in my approach and might need some guidance on how to do this.
window
operator is used for keyed streams. You should instead use windowAll
for this task. Here's the snippet:
randomNumber
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(0)
.print()
.setParallelism(1);
Also check this for reference on various window considerations.