I am writing a case to test flink two step commit, below is overview.
sink kafka
is exactly once kafka producer. sink step
is mysql sink extend two step commit
. sink compare
is mysql sink extend two step commit
, and this sink will occasionally throw a exeption to simulate checkpoint failed.
When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka consumer will read offset from last success and kafka producer produce messages even he was done it before this checkpoint failed.
How to avoid duplicate message in this case?
Thanks for help.
flink 1.9.1
java 1.8
kafka 2.11
kafka producer code:
dataStreamReduce.addSink(new FlinkKafkaProducer<>(
new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
UUID uuid = UUID.randomUUID();
JSONObject jsonObject = new JSONObject();
jsonObject.put("uuid", uuid.toString());
jsonObject.put("key1", element.f0);
jsonObject.put("key2", element.f1);
jsonObject.put("key3", element.f2);
jsonObject.put("indicate", element.f3);
return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
)).name("sink kafka");
checkpoint settings:
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
mysql sink:
new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
Connection, Void>
(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {
int count = 0;
Connection connection;
protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
if (count > 10) {
throw new Exception("compare test exception.");
PreparedStatement ps = transaction.prepareStatement(
" insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
" values(?, ?, ?, ?, ?) " +
" ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
ps.setString(1, context.timestamp().toString());
ps.setString(2, value.f0);
ps.setString(3, value.f1);
ps.setString(4, value.f1);
ps.setLong(5, value.f3);
count += 1;
protected Connection beginTransaction() throws Exception {
LOGGER.error("compare in begin transaction");
try {
if (connection.isClosed()) {
throw new Exception("mysql connection closed");
}catch (Exception e) {
LOGGER.error("mysql connection is error: " + e.toString());
LOGGER.error("reconnect mysql connection");
String jdbcURI = "jdbc:mysql://";
Connection connection = DriverManager.getConnection(jdbcURI);
this.connection = connection;
return this.connection;
protected void preCommit(Connection transaction) throws Exception {
LOGGER.error("compare in pre Commit");
protected void commit(Connection transaction) {
LOGGER.error("compare in commit");
try {
} catch (Exception e) {
LOGGER.error("compare Commit error: " + e.toString());
protected void abort(Connection transaction) {
LOGGER.error("compare in abort");
try {
} catch (Exception e) {
LOGGER.error("compare abort error." + e.toString());
protected void recoverAndCommit(Connection transaction) {
LOGGER.error("compare in recover And Commit");
protected void recoverAndAbort(Connection transaction) {
LOGGER.error("compare in recover And Abort");
.setParallelism(1).name("sink compare");
I'm not quite sure I understand the question correctly:
Kafka producer is not reading any data. So, I'm assuming your whole pipeline rereads old offsets and produces duplicates. If so, you need to understand how Flink ensures exactly once.
For the last point, there are two options:
The latter option is used for the Kafka sink. It uses Kafka transactions for letting it deduplicate data. To avoid duplicates on consumer side, you need to ensure it's not reading uncommitted data as mentioned in the documentation. Also make sure your transaction timeout is large enough that it doesn't discard data between failure and recovery.