We have an application the consumes messages from Kafka and process it. We are using Spring boot 2.2.6.RELEASE, and Spring cloud Hoxton.SR4.
I am trying to receive a simple message:
"payload": {
"config": {
"credentials": {}
"id": "est-00001",
"merchantKey": "test-00001",
"name": "Test",
"version": 7,
"type": "PARTNER",
"vendorNumber": "14"
"metadata": {
"timestamp": -1,
"partition": 8,
"key": "test-00001",
"offset": 105,
"topic": "configure",
"headers": []
"key": "00001",
"messageType": "dent.set",
"id": "abf75248-6fb0-4b57-a92c-74d4d3143cc0",
"time": "2018-03-16T15:56Z"
And this is the model I am using to deserialise the message
package com.commercetools.tuev.marketplace.merchant.model;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
public class MerchantMessage {
public MerchantMessage(){}
@JsonPropertyDescription("The message id, usually a random UUID")
private String id;
@JsonPropertyDescription("ISO-8501 timestamp of the event")
private String time;
@JsonPropertyDescription("The key has to always match the entity's id (product id)")
private String key;
@JsonPropertyDescription("The Message Type")
private String messageType;
private Map<String, Object> payload;
private Map<String, Object> metadata;
and all I get is the following exception:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Flux<Message<MerchantMessage>> message) {
.subscribe(payload-> System.out.println("Consumed: "+payload));
If I remove the flux, everything just works fine:
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Message<MerchantMessage> message) {
.subscribe(payload -> System.out.println("Consumed: " + payload));
The @StreamListener
and annotation-based programming model has been deprecated for a while now and for the past several years we have fully migrated to a functional programming model which requires no annotations.
You can simply change your code to be
Consumer<Flux<Message<MerchantMessage>> message> {
return flux -> flux
.subscribe(payload-> System.out.println("Consumed: "+payload));
Your input binding by convention is named message-in-0
. You can get more information here as well.