Search code examples
apache-flink

how to flush batch data to sink in apache flink


I am using apache flink(v1.10.0) to compute RabbitMQ message, the sink the result to MySQL, now I am compute like this:

   consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5))
                .reduce((d1, d2) -> {
                    d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                    d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                    return d1;
                })
                .addSink(new SinkFunction<ReportPump>() {
                    @Override
                    public void invoke(ReportPump value, Context context) throws Exception {
                        // save to mysql
                    }
                });

But now the sink method only get one row in each invoke, if one of rows in this batch failed,I could not rollback the batch operate.Now I want to get batch of one window and sink to database once, if failed, I rollback the insert and Apache Flink's checkpoint.This is what I trying to do now:

consumeRecord.keyBy("gameType")
                .timeWindowAll(Time.seconds(5)).reduce(new ReduceFunction<ReportPump>() {
                    @Override
                    public ReportPump reduce(ReportPump d1, ReportPump d2) throws Exception {
                        d1.setRealPumpAmount(d1.getRealPumpAmount() + d2.getRealPumpAmount());
                        d1.setPumpAmount(d1.getPumpAmount() + d2.getPumpAmount());
                        return d1;
                    }
                })
                .apply(new AllWindowFunction<ReportPump, List<ReportPump>, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow window, Iterable<ReportPump> values, Collector<List<ReportPump>> out) throws Exception {
                        ArrayList<ReportPump> employees = Lists.newArrayList(values);
                        if (employees.size() > 0) {
                            out.collect(employees);
                        }
                    }
                })
                .addSink(new SinkFunction<List<ReportPump>>() {
                    @Override
                    public void invoke(List<ReportPump> value, Context context) throws Exception {
                        PumpRealtimeHandler.invoke(value);
                    }
                });

but the apply function give tips: Cannot resolve method 'apply' in 'SingleOutputStreamOperator'. How to reduce it and get list of the batch data and flush to database only once?


Solution

  • SingleOutputStreamOperator does not have an apply method, because apply can be emit only after windowing. What you miss here is:

    .windowAll(GlobalWindows.create())
    

    between the reduce and the apply, it'll aggregate all the reduced results to one global window that contain list of all the reduces results, than you'll can make collect for one list instead multiple batchs against the database.


    I dont know if your result is a good practice, because you'll lose the parallelism of apache flink.

    You should read about the TableApi and the JDBC sink, maybe it'll help you. (more information about it here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#jdbc-connector).