I'm trying to use spring-batch remote-partitioning for scaling the Job and Apache Kafka as the middleware. here is a brief configuration of the masterStep:
@Bean
public Step managerStep() {
return managerStepBuilderFactory.get("managerStep")
.partitioner("workerStep", filePartitioner)
.outputChannel(requestForWorkers())
.inputChannel(repliesFromWorkers())
.build();
}
So I'm using channels for both sending requests to the workers as well as receiving responses from them. I know the other option is to poll the JobRepository (which works fine in my case), but I would rather not use it.
here also is some of the configs for the Kafka:
spring.kafka.producer.key-serializer= org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.properties.spring.json.add.type.headers=true
spring.kafka.consumer.properties.spring.json.trusted.packages = org.springframework.batch.integration.partition,org.springframework.batch.core
The master and the workers are configured and the master can send the request through Kafka to the workers. The workers start processing and everything is fine until the workers try to send the response through the Kafka
as you see I'm using the JsonSerializer
and JsonDeserializer
for sending/receiving the messages. The problem is that when Jackson tries to serialize the StepExecution
, it falls into an infinite loop since the StepExetion
has a JobExecution
in it and the JobExecution
also has a List
of StepExetion
s:
Caused by: org.apache.kafka.common.errors.SerializationException: Can't serialize data [StepExecution: id=3001, version=6, name=workerStep:61127a319d6caf656442ff53, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=4, rollbackCount=0, exitDescription=] for topic [repliesFromWorkers]
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]->java.util.Collections$UnmodifiableRandomAccessList[0]->org.springframework.batch.core.StepExecution["jobExecution"]->org.springframework.batch.core.JobExecution["stepExecutions"]-....
So I thought maybe I can customize the serializing of the StepExecution
so it ignores the List of StepExecutions in the JobExecution of the first StepExecution! but even in this case, it will fails at the master side while deserializing of this StepExecution
:
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of `org.springframework.batch.core.StepExecution` (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)
Is there anyway to make this work?
Im using Spring Boot 2.4.2 and its corresponding versions of the spring-boot-starter-batch
, spring-batch-integration
, spring-integration-kafka
and spring-kafka
you can create a custom (de)serializer and handle it manually. something like this will help:
public class KafkaStringOrByteSerializer<T> extends JsonSerializer<T> {
private final Serializer<Object> byteSerializer = new DefaultSerializer();
private final org.apache.kafka.common.serialization.Serializer<String> stringSerializer = new StringSerializer();
@Override
public byte[] serialize(String topic, T data) {
if (needsBinarySerializer(data)) {
return this.serializeBinary(data);
} else {
return stringSerializer.serialize(topic, (String) data);
}
}
private boolean needsBinarySerializer(Object data) {
if (data instanceof byte[] || data instanceof Byte[] || data instanceof Byte)
return true;
if (data != null && data.getClass() != null) {
return (data.getClass().getName()).startsWith("org.springframework.batch");
}
return false;
}
private byte[] serializeBinary(Object data) {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byteSerializer.serialize(data, output);
return output.toByteArray();
} catch (IOException e) {
throw new MessageConversionException("Cannot convert object to bytes", e);
}
}
}
a similar approach can be taken for the deserializer