Search code examples
javaspringkafka-producer-apispring-kafka

Event based commit in transaction KafkaTemplate using KafkaTransactionManager


Spring managed KafkaTemplate provides

template.send(record).addCallback(...
template.executeInTransaction(...

Now let's say I have a method doWork() which is triggered on a event (say a TCP/IP message).

@Autowired
KafkaTemplate template;

// This method is triggered on a event
doWork(EventType event){
    switch(event){
        case Events.Type1 :
            template.send(record); break;
        case Events.Type2 :
            // Question : How do I achieve a commit of all my previous sends here?
         default : break;
    }
}

Basically, I need to achieve a transaction by adding @Transaction over doWork() or a

template.executeInTransaction(...

in code. But I want to batch a couple of [template.send()]s and do a commit after a couple of calls to the doWork() method, how do I achieve that?

My producer configurations has transactions enabled and a KafkaTransactionManager wired to the producer factory.


Solution

  • kafkaTemplate.executeInTransaction(t -> {
        boolean stayIntransaction = true;
        while (stayInTransaction) {
            Event event = readTcp()
            doWork(event);
            stayInTransaction = transactionDone(event);
        }
    }
    

    As long as the doWork() method uses the same template, and it runs within the scope of the callback, the work will run in the transaction.

    Or

    @Transactional
    public void doIt() {
        boolean stayIntransaction = true;
        while (stayInTransaction) {
            Event event = readTcp()
            doWork(event);
            stayInTransaction = transactionDone(event);
        }
    }
    

    When using declarative transactions.

    If the TCP events are async, you will somehow need to hand them off to the thread running the transaction, such as using a BlockingQueue<?>.