The issue is pretty much the same as https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455 but for the Functional
approach
@Bean
public Consumer<Message<Foo>> fooConsumer() {
return message -> {
String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
Foo payLoad = message.getPayload();
if (payLoad == null) {
deleteSomething(key);
} else {
addSomething(key, payLoad);
}
};
}
When a proper 'Foo' (json) is produced the fooConsumer is invoked. But when a 'null' payload/tombstone is produced the consumer function is never called.
Workaround I tried with Custom 'Convertor' approach which is working:
@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {
@Override
protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
if (message.getPayload() instanceof KafkaNull) {
return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
}
return super.convertFromInternal(message, targetClass, conversionHint);
}
}
The fooConsumer
then can check payload.isDeleted()
to handle null
case.
But this is verbose, pollutes pojo/model classes and have to repeat for every consumers.
I understand that spring-integration cannot working with null. But are there any better/standard approach to handle the 'tombstone' use-case with Functional approach?
Version: 3.0.4.RELEASE
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>
This was recently fixed on the main branch https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/936 and https://github.com/spring-cloud/spring-cloud-function/issues/557