Search code examples
javaspringapache-kafkaspring-batchspring-integration

Problem Serializing Spring batch Kafka ChunkRequest


I am writing a sample Spring batch Kafka integration, with Remote Chunking. At first a master read some sample record (Item.java) in some Chunks. Then send this chunks to a (Spring Integration) channel and a Kafka producer send this Chunks to a Kafka topic. The problem is KafkaTemplate cant serialize the ChenkRequest.

if I use :

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

then this error raise:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.batch.integration.chunk.ChunkRequest to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

and if I write a custom serializer like:

public class CustomSerializer implements Serializer<ChunkRequest<Item>> {
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, ChunkRequest<Item> data) {
        try {
            System.out.println("Serializing...");
            return objectMapper.writeValueAsBytes(data);
        } catch (Exception e) {
            throw new SerializationException("Error when serializing MessageDto to byte[]");
        }
    }

    @Override
    public void close() {
    }
}

and this config:

spring.kafka.producer.value-serializer=com.example.myremotechunking.batch.CustomSerializer

this error would raise:

com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]

So, what is the problem!?


Solution

  • The org:spring-projects org.springframework.batch.integration.chunk.ChunkRequest is not JSON-compatible. Consider to write a Serializer<ChunkRequest<Item>> which would serialize using standard Java serialization feature. You may use a org.springframework.core.serializer.DefaultSerializer.serializeToByteArray() from Spring as a delegate.

    Probably you would need a similar deserialzier on the other side...