Search code examples
javaapache-kafkaapache-kafka-streamsspring-kafkaspring-cloud-stream

How to deserialize MessagePack in Spring Cloud Stream App


I'm trying to create a Kafka streams app using Spring Cloud Stream, but am struggling to deserialize the input messages, whose values have been encoded using MessagePack.

Here's what I've got so far:

// TransactionApplication.java

@SpringBootApplication
public class TransactionApplication {

  public static void main(String[] args) {
    SpringApplication.run(TransactionApplication.class, args);
  }

  public static class TransactionConsumer {

    @Bean
    public Serde<Transaction> transactionSerde() {
      ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
      return new JsonSerde<Transaction>(mapper);
    }

    @Bean
    public Consumer<KStream<String, Transaction>> process() {
      return input -> input.foreach((key, value) -> {
        System.out.println("Key: " + key + " Value: " + value);
      });
    }
  }
}
// Transaction.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
  String item;
  Number amount;
}

I'm getting the error:

java.lang.IllegalStateException: No type information in headers and no default type provided.

My application.yml is:

spring.cloud.stream:
  bindings:
    process-in-0:
      destination: transactions
  kafka:
    streams:
      binder:
        applicationId: transactions-application
        configuration:
          commit.interval.ms: 100

After including spring.json.value.default.type: com.example.Transaction underneath the configuration node of my applicaton.yml, I'm getting another error. See below.

Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
 at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]

Solution

  • You need to provide a hint to the deserializer to tell it what object to create from the encoded payload.

    If the record was created by the Spring JsonSerializer, the hints are in the headers.

    If not, you have to provide the hint in the streams configuration.

    You need to show your application.yml/properties.