Search code examples
apache-flink

Apache Flink read at least 2 record to trigger sink


I am write my Apache Flink(1.10) to update records real time like this:

public class WalletConsumeRealtimeHandler {

    public static void main(String[] args) throws Exception {
        walletConsumeHandler();
    }

    public static void walletConsumeHandler() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkUtil.initMQ();
        FlinkUtil.initEnv(env);
        DataStream<String> dataStreamSource = env.addSource(FlinkUtil.initDatasource("wallet.consume.report.realtime"));
        DataStream<ReportWalletConsumeRecord> consumeRecord =
                dataStreamSource.map(new MapFunction<String, ReportWalletConsumeRecord>() {
                    @Override
                    public ReportWalletConsumeRecord map(String value) throws Exception {
                        ObjectMapper mapper = new ObjectMapper();
                        ReportWalletConsumeRecord consumeRecord = mapper.readValue(value, ReportWalletConsumeRecord.class);
                        consumeRecord.setMergedRecordCount(1);
                        return consumeRecord;
                    }
                }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());

        consumeRecord.keyBy(
                new KeySelector<ReportWalletConsumeRecord, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> getKey(ReportWalletConsumeRecord value) throws Exception {
                        return Tuple2.of(value.getConsumeItem(), value.getTenantId());
                    }
                })
                .timeWindow(Time.seconds(5))
                .reduce(new SumField(), new CollectionWindow())
                .addSink(new SinkFunction<List<ReportWalletConsumeRecord>>() {
                    @Override
                    public void invoke(List<ReportWalletConsumeRecord> reportPumps, Context context) throws Exception {
                        WalletConsumeRealtimeHandler.invoke(reportPumps);
                    }
                });
        env.execute(WalletConsumeRealtimeHandler.class.getName());
    }

    private static class CollectionWindow extends ProcessWindowFunction<ReportWalletConsumeRecord,
            List<ReportWalletConsumeRecord>,
            Tuple2<String, Long>,
            TimeWindow> {
        public void process(Tuple2<String, Long> key,
                            Context context,
                            Iterable<ReportWalletConsumeRecord> minReadings,
                            Collector<List<ReportWalletConsumeRecord>> out) throws Exception {
            ArrayList<ReportWalletConsumeRecord> employees = Lists.newArrayList(minReadings);
            if (employees.size() > 0) {
                out.collect(employees);
            }
        }
    }

    private static class SumField implements ReduceFunction<ReportWalletConsumeRecord> {
        public ReportWalletConsumeRecord reduce(ReportWalletConsumeRecord d1, ReportWalletConsumeRecord d2) {
            Integer merged1 = d1.getMergedRecordCount() == null ? 1 : d1.getMergedRecordCount();
            Integer merged2 = d2.getMergedRecordCount() == null ? 1 : d2.getMergedRecordCount();
            d1.setMergedRecordCount(merged1 + merged2);
            d1.setConsumeNum(d1.getConsumeNum() + d2.getConsumeNum());
            return d1;
        }
    }

    public static void invoke(List<ReportWalletConsumeRecord> records) {
        WalletConsumeService service = FlinkUtil.InitRetrofit().create(WalletConsumeService.class);
        Call<ResponseBody> call = service.saveRecords(records);
        call.enqueue(new Callback<ResponseBody>() {
            @Override
            public void onResponse(Call<ResponseBody> call, Response<ResponseBody> response) {

            }

            @Override
            public void onFailure(Call<ResponseBody> call, Throwable t) {
                t.printStackTrace();
            }
        });
    }


}

and now I found the Flink task only receive at least 2 records to trigger sink, is the reduce action need this?


Solution

  • You need two records to trigger the window. Flink only knows when to close a window (and fire subsequent calculation) when it receives a watermark that is larger than the configured value of the end of the window.

    In your case, you use BoundedOutOfOrdernessGenerator, which updates the watermark according to the incoming records. So it generates a second watermark only after having seen the second record.

    You can use a different watermark generator. In the troubleshooting training there is a watermark generator that also generates watermarks on timeout.