I'm trying to create a Kafka streams app using Spring Cloud Stream, but am struggling to deserialize the input messages, whose values have been encoded using MessagePack.
Here's what I've got so far:
// TransactionApplication.java
@SpringBootApplication
public class TransactionApplication {
public static void main(String[] args) {
SpringApplication.run(TransactionApplication.class, args);
}
public static class TransactionConsumer {
@Bean
public Serde<Transaction> transactionSerde() {
ObjectMapper mapper = new ObjectMapper(new MessagePackFactory());
return new JsonSerde<Transaction>(mapper);
}
@Bean
public Consumer<KStream<String, Transaction>> process() {
return input -> input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
}
// Transaction.java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Transaction {
String item;
Number amount;
}
I'm getting the error:
java.lang.IllegalStateException: No type information in headers and no default type provided.
My application.yml is:
spring.cloud.stream:
bindings:
process-in-0:
destination: transactions
kafka:
streams:
binder:
applicationId: transactions-application
configuration:
commit.interval.ms: 100
After including spring.json.value.default.type: com.example.Transaction
underneath the configuration
node of my applicaton.yml, I'm getting another error. See below.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 116, 101, 109, 34, 58, 32, 34, 112, 114, 105, 118, 97, 116, 101, 32, 106, 101, 116, 34, 44, 32, 34, 97, 109, 111, 117, 110, 116, 34, 58, 32, 53, 48, 50, 125]] from topic [transactions]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `com.jackdry.processors.json.Transaction` (although at least one Creator exists): no int/Int-argument constructor/factory method to deserialize from Number value (123)
at [Source: (byte[])"{"item": "private jet", "amount": 502}"; line: -1, column: 0]
You need to provide a hint to the deserializer to tell it what object to create from the encoded payload.
If the record was created by the Spring JsonSerializer, the hints are in the headers.
If not, you have to provide the hint in the streams configuration.
You need to show your application.yml/properties
.