Search code examples
apache-kafkaspring-kafkaspring-cloud-stream-binder-kafkatombstonespring-cloud-function

spring cloud stream kafka - Functional approach: Consumer is never called when producing tombstone records


The issue is pretty much the same as https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/455 but for the Functional approach

    @Bean
    public Consumer<Message<Foo>> fooConsumer() {
        return message -> {
            String key = // From message header - KafkaHeaders.RECEIVED_MESSAGE_KEY
            Foo payLoad = message.getPayload();

            if (payLoad == null) {
                deleteSomething(key);
            } else {
                addSomething(key, payLoad);
            }
        };
    }

When a proper 'Foo' (json) is produced the fooConsumer is invoked. But when a 'null' payload/tombstone is produced the consumer function is never called.

Workaround I tried with Custom 'Convertor' approach which is working:

@Component
public class KafkaNullConverter extends MappingJackson2MessageConverter {

    @Override
    protected Object convertFromInternal(Message<?> message, @NonNull Class<?> targetClass, Object conversionHint) {
        if (message.getPayload() instanceof KafkaNull) {
            return Foo.builder().isDeleted(true).build(); // Note: new `isDeleted` field added to `Foo`
        }

        return super.convertFromInternal(message, targetClass, conversionHint);
    }
}

The fooConsumer then can check payload.isDeleted() to handle null case. But this is verbose, pollutes pojo/model classes and have to repeat for every consumers.

I understand that spring-integration cannot working with null. But are there any better/standard approach to handle the 'tombstone' use-case with Functional approach?

Version: 3.0.4.RELEASE

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-kafka</artifactId>
   <version>3.0.4.RELEASE</version>
</dependency>

Solution

  • This was recently fixed on the main branch https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/936 and https://github.com/spring-cloud/spring-cloud-function/issues/557