I want to consume a batch of Avro events in the Spring Boot application. Because of business requirements, I have to acknowledge the batch manually, therefore I need to have an incoming batch wrapped in the Message
and access to its headers. I created the following binding:
@Bean
public Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>> consumeAvroEvents() {
return flux -> flux
.concatMap(events -> {
Acknowledgment acknowledgment = events.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
List<MyAvroEvent> events = events.getPayload(); // this list is empty
// save event in DB
// acknowledge in case of no timeout-related exceptions
// otherwise do not acknowledge
})
.then();
}
I noticed that events.getPayload()
is empty which is incorrect. When I got rid of the type parameter and use wildcard the payload is not empty:
@Bean
public Function<Flux<Message<?>>, Mono<Void>> consumeAvroEvents() {
return flux -> flux
.concatMap(events -> {
Message<List<MyAvroEvents>> typedEvents = (Message<List<MyAvroEvents>>) events;
Acknowledgment acknowledgment = typedEvents.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
List<MyAvroEvent> events = typedEvents.getPayload(); // this list is NOT empty
// save event in DB
// acknowledge in case of no timeout-related exceptions
// otherwise do not acknowledge
})
.then();
}
I debugged the first version Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>>
and I can see that the payload is lost under SimpleFunctionRegistry.java#L1256. Is it a bug or only the wildcard parameter is allowed in consuming batches? The doc mentions only wildcard actually.
Versions:
Turns out that use-native-decoding: true
makes it work as described under
How to retrieve a batch of kafka avro messages with message headers for acknowledgement using Spring Cloud Stream
With that, I can use the type parameter:
Function<Flux<Message<List<MyAvroEvent>>>, Mono<Void>> consumeAvroEvents()