Search code examples
apache-flink

how to sum multi fields in flink


I am sum one fields in Apache Flink 1.10 like this,I am receive RabbitMQ messages and handle it in memory,finally save it to MySQL,the sum operate code like this:

 consumeRecord.keyBy("gameType")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("realPumpAmount")
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // handle sink logic
                    }
                });

Now I want to sum multi fields in the entity of MQ like this:

consumeRecord.keyBy("gameType")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("field1","field2")
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // handle sink logic
                    }
                });

Is there any way to implement this purpose?


Solution

  • sum reducer accepts just a single field. You can write such reducer yourself:

    consumeRecord.keyBy("gameType")
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .reduce((d1, d2) -> {
            d1.field1 += d2.field1;
            d1.field2 += d2.field2;
            return d1;
        })
        .addSink(new SinkFunction<ReportPump>() {
            @Override
            public void invoke(ReportPump value, Context context) throws Exception {
                // handle sink logic
            }
        });