Search code examples
spring-batchspring-kafkaspring-cloud-dataflow

How to get full Message with KafkaItemReader in SCDF Task?


I'm trying to create a SCDF Task to handle errors but I can't figure out how to get full kafka message with payload and headers.

The idea is to route the messages to a DLQ in my streams when a service is not responding. For example some HTTP service is down and the httclient app is failing.

When the HTTP service is back up, I would like to run a task which take the messages in the DLQ and resend them to the proper Kafka topic, no matter what the message is.

I'm trying to make a generic task so the DLQ and target topic are Kafka consumer and producer properties. And I would like to use generic org.springframework.messaging.Message too.

When I'm using KafkaItemReader<String, String> and KafkaItemWriter<String, String> and it works fine with only the payload as String but all headers are lost. When I use KafkaItemReader<String, Message<?>> and KafkaItemWriter<String, Message<?>> to also get headers, I have a ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message

2020-11-13T14:27:03.472446462+01:00 stdout F java.lang.ClassCastException: java.lang.String cannot be cast to org.springframework.messaging.Message
2020-11-13T14:27:03.472450493+01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.47245463+01:00 stdout F     at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472457814+01:00 stdout F    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472460712+01:00 stdout F    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472463956+01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]
2020-11-13T14:27:03.472468765+01:00 stdout F    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.4.RELEASE.jar:4.2.4.RELEASE]

Is there a way to do this ?


Solution

  • In fact it seems that there is no way to get message headers with KafkaItemReader and KafkaItemWriter. Serializer/deserializer are used for key and payload but I can't find a way to get headers.

    I solved this issue by using a Tasklet instead of KafkaItemReader and KafkaItemWriter. In my Tasklet, I use KafkaConsumer and KafkaProducer to deal with ConsumerRecord and ProducerRecord which allow me to copy headers.

    Moreover I can handle commit more properly (no auto commit) : consumer offsets are committed only if the messages are sent by the producer.