I am write my Apache Flink(1.10) to update records real time like this:
public class WalletConsumeRealtimeHandler {
public static void main(String[] args) throws Exception {
walletConsumeHandler();
}
public static void walletConsumeHandler() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkUtil.initMQ();
FlinkUtil.initEnv(env);
DataStream<String> dataStreamSource = env.addSource(FlinkUtil.initDatasource("wallet.consume.report.realtime"));
DataStream<ReportWalletConsumeRecord> consumeRecord =
dataStreamSource.map(new MapFunction<String, ReportWalletConsumeRecord>() {
@Override
public ReportWalletConsumeRecord map(String value) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ReportWalletConsumeRecord consumeRecord = mapper.readValue(value, ReportWalletConsumeRecord.class);
consumeRecord.setMergedRecordCount(1);
return consumeRecord;
}
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
consumeRecord.keyBy(
new KeySelector<ReportWalletConsumeRecord, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> getKey(ReportWalletConsumeRecord value) throws Exception {
return Tuple2.of(value.getConsumeItem(), value.getTenantId());
}
})
.timeWindow(Time.seconds(5))
.reduce(new SumField(), new CollectionWindow())
.addSink(new SinkFunction<List<ReportWalletConsumeRecord>>() {
@Override
public void invoke(List<ReportWalletConsumeRecord> reportPumps, Context context) throws Exception {
WalletConsumeRealtimeHandler.invoke(reportPumps);
}
});
env.execute(WalletConsumeRealtimeHandler.class.getName());
}
private static class CollectionWindow extends ProcessWindowFunction<ReportWalletConsumeRecord,
List<ReportWalletConsumeRecord>,
Tuple2<String, Long>,
TimeWindow> {
public void process(Tuple2<String, Long> key,
Context context,
Iterable<ReportWalletConsumeRecord> minReadings,
Collector<List<ReportWalletConsumeRecord>> out) throws Exception {
ArrayList<ReportWalletConsumeRecord> employees = Lists.newArrayList(minReadings);
if (employees.size() > 0) {
out.collect(employees);
}
}
}
private static class SumField implements ReduceFunction<ReportWalletConsumeRecord> {
public ReportWalletConsumeRecord reduce(ReportWalletConsumeRecord d1, ReportWalletConsumeRecord d2) {
Integer merged1 = d1.getMergedRecordCount() == null ? 1 : d1.getMergedRecordCount();
Integer merged2 = d2.getMergedRecordCount() == null ? 1 : d2.getMergedRecordCount();
d1.setMergedRecordCount(merged1 + merged2);
d1.setConsumeNum(d1.getConsumeNum() + d2.getConsumeNum());
return d1;
}
}
public static void invoke(List<ReportWalletConsumeRecord> records) {
WalletConsumeService service = FlinkUtil.InitRetrofit().create(WalletConsumeService.class);
Call<ResponseBody> call = service.saveRecords(records);
call.enqueue(new Callback<ResponseBody>() {
@Override
public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {
}
@Override
public void onFailure(Call<ResponseBody> call, Throwable t) {
t.printStackTrace();
}
});
}
}
and now I found the Flink task only receive at least 2 records to trigger sink, is the reduce action need this?
You need two records to trigger the window. Flink only knows when to close a window (and fire subsequent calculation) when it receives a watermark that is larger than the configured value of the end of the window.
In your case, you use BoundedOutOfOrdernessGenerator
, which updates the watermark according to the incoming records. So it generates a second watermark only after having seen the second record.
You can use a different watermark generator. In the troubleshooting training there is a watermark generator that also generates watermarks on timeout.