Search code examples

flink: Interrupted while waiting for data to be acknowledged by pipeline

I was doing a POC of flink CDC + iceberg. I followed this debezium tutorial to send cdc to kafka - My flink job was working fine and writing data to hive table for inserts. But when I fired an update/delete query to the mysql table, I started getting this error in my flink job. I have also attached the output of retract stream

Update query - UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

1> (true,1001,Sally,Thomas,
1> (true,1002,George,Bailey,
1> (true,1003,Edward,Walker,
1> (true,1004,Anne,Kretchmar,
1> (true,1005,Sarah,Thompson,
1> (false,1004,Anne,Kretchmar,
1> (true,1004,Anne Marie,Kretchmar,

Error stack trace

15:27:42.163 [Source: TableSourceScan(table=[[default_catalog, default_database, topic_customers]], fields=[id, first_name, last_name, email]) -> SinkConversionToTuple2 -> (Map -> Map -> IcebergStreamWriter, Sink: Print to Std. Out) (3/4)] ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. Interrupted while waiting for data to be acknowledged by pipeline
    at org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno( ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.flushInternal( ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl( ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.hdfs.DFSOutputStream.close( ~[hadoop-hdfs-client-2.10.1.jar:?]
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close( ~[hadoop-common-2.10.1.jar:?]
    at org.apache.hadoop.fs.FSDataOutputStream.close( ~[hadoop-common-2.10.1.jar:?]
    at ~[iceberg-flink-runtime-0.11.0.jar:?]

Here’s my code, topic_customers is Kafka dynamic table which is listening to cdc events

Table out = tEnv.sqlQuery("select * from topic_customers"); 
DataStream<Tuple2<Boolean, Row>> dsRow = tEnv.toRetractStream(out, Row.class);
DataStream<Row> dsRow2 =<Tuple2<Boolean, Row>, Row>) x -> x.f1);
TableLoader tableLoader = TableLoader.fromCatalog(catalogLoader, tableIdentifier);
        .field("id", DataTypes.BIGINT())
        .field("first_name", DataTypes.STRING())
        .field("last_name", DataTypes.STRING())
        .field("email", DataTypes.STRING())


  • I fixed the issue by moving to the iceberg v2 spec. You can refer to this PR: