Spring managed KafkaTemplate provides
template.send(record).addCallback(...
template.executeInTransaction(...
Now let's say I have a method doWork() which is triggered on a event (say a TCP/IP message).
@Autowired
KafkaTemplate template;
// This method is triggered on a event
doWork(EventType event){
switch(event){
case Events.Type1 :
template.send(record); break;
case Events.Type2 :
// Question : How do I achieve a commit of all my previous sends here?
default : break;
}
}
Basically, I need to achieve a transaction by adding @Transaction over doWork() or a
template.executeInTransaction(...
in code. But I want to batch a couple of [template.send()]s and do a commit after a couple of calls to the doWork() method, how do I achieve that?
My producer configurations has transactions enabled and a KafkaTransactionManager wired to the producer factory.
kafkaTemplate.executeInTransaction(t -> {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}
As long as the doWork()
method uses the same template, and it runs within the scope of the callback, the work will run in the transaction.
Or
@Transactional
public void doIt() {
boolean stayIntransaction = true;
while (stayInTransaction) {
Event event = readTcp()
doWork(event);
stayInTransaction = transactionDone(event);
}
}
When using declarative transactions.
If the TCP events are async, you will somehow need to hand them off to the thread running the transaction, such as using a BlockingQueue<?>
.