Search code examples
apache-kafkaapache-flinkflink-table-api

Can we connect to/from a Kafka compacted topic with the Flink kafka Upsert connector?


This feels obvious, but I'm asking anyway since I can't find a clear confirmation in the documentation:

The semantics of the Flink Table API upsert kafka connector available in Flink 1.12 match pretty well the semantics of a Kafka compacted topics: interpreting the stream as a changelog and using NULL values as tombstone to mark deletions.

So my assumption is that it is ok to use it to consume from and produce to a compacted topic, and it's probably made precisely for that, although it should work fine as well with a non-compacted topic assuming its content is indeed a changelog. But I'm surprised not to find any reference to compacted topic in that part of the documentation.

Could somebody please confirm or infirm this assumption?


Solution

  • Yes, it was made for use with compacted topics. According to FLIP-149:

    Generally speaking, the underlying topic of the upsert-kafka source must be compacted. Besides, the underlying topic must have all the data with the same key in the same partition, otherwise, the result will be wrong.