My team and I are implementing Spring Kafka within our codebase, so to achieve all the benefits of an event-driven ecosystem.
The Spring Kafka configs are as follows:
...
spring:
kafka:
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: >
org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
json.trusted.packages: "{trusted packages}"
deserializer.value.delegate.class: >-
org.springframework.kafka.support.serializer.JsonDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
security:
protocol: SASL_SSL
sasl:
jaas:
config: >
...
mechanism: PLAIN
basic:
auth:
...
schema:
...
bootstrap:
...
...
app:
kafka:
event-type-one:
topic: >-
{topic}
consumer-group-id: {group id}
default-type: {default type event}
event-type-two:
topic: >-
{topic}
And the Kafka Consumer is configured likewise:
@KafkaListener(
topics = ["\${app.kafka.path.topic}"],
groupId = "\${app.kafka.path.group-id}",
properties = ["spring.json.value.default.type=\${app.kafka.event-type-one.default-type}"]
)
fun fooBar(record: ConsumerRecord<String, Any>) = runBlocking {
Please notice that we've provided a default type for our Consumer, because we are not specifying any headers in the incoming events, as per the docs [1].
The question is: when having a Spring Kafka configuration that enables the use of a Kafka Consumer, is providing a default type the correct approach in order to solve the "no headers provided" deserialisation error upon the Consumer trying to deserialise an event? The default type is needed to be provided, according to the docs [1], if no headers are present in the event that's being produced and consumed via our @KafkaListener config.
Is this the right way to implement a Kafka Listener via the Spring Kafka [2] library?
Our goal is to potentially have more Consumers, so would that mean that for each Consumer (as it would have it's own event type) we need to provide a default type?
[1] https://docs.spring.io/spring-kafka/reference/html/#serdes-json-config
[2] https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka
If your method signature is like that record: ConsumerRecord<String, Any>
and you really cannot set a proper expected type via that parameter, then really only the way is to set that spring.json.value.default.type
property. You also may need to look into a spring.json.use.type.headers
property to be set to false
. This way the JsonDeserializer
won't look into headers for type at all, and will fallback to that provided default type. Do you have any problem with that kind of configuration?