Search code examples
spring-batchspring-integration

remote-partitioning in spring-batch with Kafka as middleware


I'm trying to use spring-batch remote-partitioning for scaling the Job and Apache Kafka as the middleware. here is a brief configuration of the masterStep:

    @Bean
    public Step managerStep() {
        return managerStepBuilderFactory.get("managerStep")
                .partitioner("workerStep", filePartitioner)
                .outputChannel(requestForWorkers())
                .inputChannel(repliesFromWorkers())
                .build();
    }

So I'm using channels for both sending requests to the workers as well as receiving responses from them. I know the other option is to poll the JobRepository (which works fine in my case), but I would rather not use it.

here also is some of the configs for the Kafka:

spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.producer.properties.spring.json.add.type.headers=true
spring.kafka.consumer.properties.spring.json.trusted.packages = org.springframework.batch.integration.partition,org.springframework.batch.core

The master and the workers are configured and the master can send the request through Kafka to the workers. The workers start processing and everything is fine until the workers try to send the response through the Kafka

as you see I'm using the JsonSerializer and JsonDeserializer for sending/receiving the messages. The problem is that when Jackson tries to serialize the StepExecution, it falls into an infinite loop since the StepExetion has a JobExecution in it and the JobExecution also has a List of StepExetions:

Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=3001, version=6, name=workerStep:61127a319d6caf656442ff53, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=4, rollbackCount=0, exitDescription=] for topic [repliesFromWorkers]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]-....

So I thought maybe I can customize the serializing of the StepExecution so it ignores the List of StepExecutions in the JobExecution of the first StepExecution! but even in this case, it will fails at the master side while deserializing of this StepExecution:

Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.springframework.batch.core.StepExecution` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)

Is there anyway to make this work? Im using Spring Boot 2.4.2 and its corresponding versions of the spring-boot-starter-batch, spring-batch-integration, spring-integration-kafka and spring-kafka


Solution

  • you can create a custom (de)serializer and handle it manually. something like this will help:

    public class KafkaStringOrByteSerializer<T> extends JsonSerializer<T> {
    
        private final Serializer<Object> byteSerializer = new DefaultSerializer();
        private final org.apache.kafka.common.serialization.Serializer<String> stringSerializer = new StringSerializer();
    
        @Override
        public byte[] serialize(String topic, T data) {
            if (needsBinarySerializer(data)) {
                return this.serializeBinary(data);
            } else {
                return stringSerializer.serialize(topic, (String) data);
            }
        }
    
        private boolean needsBinarySerializer(Object data) {
            if (data instanceof byte[] || data instanceof Byte[] || data instanceof Byte)
                return true;
            if (data != null && data.getClass() != null) {
                return (data.getClass().getName()).startsWith("org.springframework.batch");
            }
            return false;
        }
    
        private byte[] serializeBinary(Object data) {
            try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
                byteSerializer.serialize(data, output);
                return output.toByteArray();
            } catch (IOException e) {
                throw new MessageConversionException("Cannot convert object to bytes", e);
            }
        }
    
    }
    

    a similar approach can be taken for the deserializer