Search code examples
apache-flinkflink-streaming

Summing a number from a random number source


I'm just starting to learn flink and trying to build a very basic toy example which sums an integer over time and periodically prints the total sum so far

I've created a random number generator source class like this:

// RandomNumberSource.java
public class RandomNumberSource implements SourceFunction<Integer> {
    public volatile boolean isRunning = true;
    private Random rand;

    public RandomNumberSource() {
        this.rand = new Random();
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(rand.nextInt(200));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }
}

As you can see, it generates a random number every 1 second

Now how would I go about summing the number that's being generated?

// StreamJob.java
public class StreamingJob {
    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> randomNumber = env.addSource(new RandomNumberSource());

        // pseudo code:
        // randomNumber
        //    .window(Time.seconds(5))
        //    .reduce(0, (acc, i) => acc+i) // (initial value, reducer)
        //    .sum()


        // execute program
        env.execute("Flink Streaming Random Number Sum Aggregation");
    }
}

I've added pseudo code to explain what I'm trying to do. i.e every 5 seconds, perform a sum of all the numbers and print it out.

I feel like I'm missing something in my approach and might need some guidance on how to do this.


Solution

  • window operator is used for keyed streams. You should instead use windowAll for this task. Here's the snippet:

        randomNumber
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(0)
                .print()
                .setParallelism(1);
    

    Also check this for reference on various window considerations.