Search code examples
mongodbapache-kafkaspring-kafkaspring-cloud-streamdebezium

How to rename the id header of a debezium mongodb connector outbox message


I am trying to use the outbox event router of debezium for mongodb. The consumer is a spring cloud stream application. I cannot deserialize the message because spring cloud expects the message id header to be UUID, but it receives byte[]. I have tried different deserializers to no avail. I am thinking of renaming the id header in order to skip this spring cloud check, or remove it altogether. I have tried the ReplaceField SMT but it does not seem to modify the header fields.

Also is there a way to overcome this in spring?


Solution

  • The solution to the initial question is to use the DropHeaders SMT(https://docs.confluent.io/platform/current/connect/transforms/dropheaders.html). This will remove the id header that is populated by debezium.

    But as Oleg Zhurakousky mentioned, moving to a newer version of spring-cloud-stream without @StreamListener solves the underlying problem. Apparently @StreamListener checks if a message has an id header and it demands to be of type Uuid. By using the new functional way of working with spring-cloud-stream, the id header is actually overwritten with a new generated value. This means that the value populated by debezium (the id column form the outbox table) is ignored. I guess if you need to check for duplicate delivery, maybe it is better to create your own header instead of using the id. I do not know if spring-cloud-stream generates the same id for the same message if it is redelivered.

    Also keep in mind that even in the newer versions of spring-cloud-stream, if you use the deprecated @StreamListener, you will have the same problem.