I am sum one fields in Apache Flink 1.10 like this,I am receive RabbitMQ messages and handle it in memory,finally save it to MySQL,the sum operate code like this:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("realPumpAmount")
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});
Now I want to sum multi fields in the entity of MQ like this:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("field1","field2")
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});
Is there any way to implement this purpose?
sum
reducer accepts just a single field. You can write such reducer yourself:
consumeRecord.keyBy("gameType")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce((d1, d2) -> {
d1.field1 += d2.field1;
d1.field2 += d2.field2;
return d1;
})
.addSink(new SinkFunction<ReportPump>() {
@Override
public void invoke(ReportPump value, Context context) throws Exception {
// handle sink logic
}
});