Below is the config of job in etl.xml
<batch:job id="procuerJob">
<batch:step id="Produce">
<batch:partition partitioner="partitioner">
<batch:handler grid-size="${ partitioner.limit}"></batch:handler>
<batch:step>
<batch:tasklet>
<batch:chunk reader="Reader" writer="kafkaProducer"
commit-interval="20000">
</batch:chunk>
<batch:listeners>
<batch:listener ref="producingListener" />
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:partition>
</batch:step>
</batch:job>
below is the code used to send messaged to the topic.
ListenableFuture<SendResult<String, message>> listenableFuture = kafkaTemplate.send(message);
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, message >>() {
@Override
public void onSuccess(SendResult<String, message > result) {
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}
@Override
public void onFailure(Throwable ex) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}
}
Once the kafkaTemplate.send(message)is executed , the listener is called and the job completes. I see the onSuccess(), onFailure() are called post the job is completed. How can I chnage the config of job so that listener is called after receiving the acknowledgement from kafka topic?
Would you like to some code example of what you suggested to block waiting for future. That might be of help.
I did not try the following but here is the idea:
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
SendResult<String, Message> sendResult = future.get();
// inspect sendResult
log.info("marking as SUCCESS");
manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
} catch (Exception e) {
log.info("marking as FAILURE");
manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
// do something with e
}