Search code examples
javaspring-bootspring-kafkaspring-cloud-stream

Consuming batches of Avro events in reactive functional bindings. Payload is lost when using type parameter instead of wildcard


I want to consume a batch of Avro events in the Spring Boot application. Because of business requirements, I have to acknowledge the batch manually, therefore I need to have an incoming batch wrapped in the Message and access to its headers. I created the following binding:

@Bean
public Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>> consumeAvroEvents() {
    return flux -> flux
        .concatMap(events -> {
           Acknowledgment acknowledgment = events.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
           List<MyAvroEvent> events = events.getPayload(); // this list is empty
           // save event in DB
           // acknowledge in case of no timeout-related exceptions 
           // otherwise do not acknowledge
        })
        .then();
}

I noticed that events.getPayload() is empty which is incorrect. When I got rid of the type parameter and use wildcard the payload is not empty:

@Bean
public Function<Flux<Message<?>>, Mono<Void>> consumeAvroEvents() {
return flux -> flux
        .concatMap(events -> {
           Message<List<MyAvroEvents>> typedEvents = (Message<List<MyAvroEvents>>) events;
           Acknowledgment acknowledgment = typedEvents.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
  
           List<MyAvroEvent> events = typedEvents.getPayload(); // this list is NOT empty
           // save event in DB
           // acknowledge in case of no timeout-related exceptions 
           // otherwise do not acknowledge
        })
        .then();
}

I debugged the first version Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>> and I can see that the payload is lost under SimpleFunctionRegistry.java#L1256. Is it a bug or only the wildcard parameter is allowed in consuming batches? The doc mentions only wildcard actually.

Versions:

  • spring-cloud-stream:4.0.2
  • spring-cloud-stream-binder-kafka:4.0.2
  • spring-cloud-function-core:4.0.2

Solution

  • Turns out that use-native-decoding: true makes it work as described under How to retrieve a batch of kafka avro messages with message headers for acknowledgement using Spring Cloud Stream

    With that, I can use the type parameter:

    Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>> consumeAvroEvents()