I'm trying to create a SCDF Task
to handle errors but I can't figure out how to get full kafka message with payload and headers.
The idea is to route the messages to a DLQ in my streams when a service is not responding. For example some HTTP service is down and the httclient app is failing.
When the HTTP service is back up, I would like to run a task which take the messages in the DLQ and resend them to the proper Kafka topic, no matter what the message is.
I'm trying to make a generic task so the DLQ and target topic are Kafka consumer and producer properties.
And I would like to use generic org.springframework.messaging.Message
too.
When I'm using KafkaItemReader<String, String>
and KafkaItemWriter<String, String>
and it works fine with only the payload as String but all headers are lost. When I use KafkaItemReader<String, Message<?>>
and KafkaItemWriter<String, Message<?>>
to also get headers, I have a ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472446462+01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814+01:00 stdout F at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712+01:00 stdout F at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765+01:00 stdout F at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
Is there a way to do this ?
In fact it seems that there is no way to get message headers with KafkaItemReader
and KafkaItemWriter
. Serializer/deserializer are used for key and payload but I can't find a way to get headers.
I solved this issue by using a Tasklet
instead of KafkaItemReader
and KafkaItemWriter
. In my Tasklet, I use KafkaConsumer
and KafkaProducer
to deal with ConsumerRecord
and ProducerRecord
which allow me to copy headers.
Moreover I can handle commit more properly (no auto commit) : consumer offsets are committed only if the messages are sent by the producer.