My Kafka Producer keeps sending to Kafka Broker despite transaction failing. I have a custom listener i.e. I am not using the @KafkaListener annotation. This is running on Spring-kafka 2.2.x
Any ideas why the message ends up in Kafka despite KafkaTransactionManager rolling back? Here is my setup below:
// Kafka producer sender
@Transactional(transactionManager = "kafkaTransactionManager", propagation = Propagation.REQUIRED)
public void sendToKafkaWithTransaction(final String topic, final Object payload){
ProducerRecord<String, Object> record = new ProducerRecord(topic, key, payload);
template.executeInTransaction(kt -> kt.send(record));
}
// RabbitMQ producer sender
@Transactional(transactionManager = "rabbitTransactionManager", propagation = Propagation.REQUIRED)
public void sendToRabbitmqWithTransaction(final String topic, final String header, final Object payload){
template.convertAndSend(topic, header, payload);
}
// Chained Transaction Manager
@Bean(name = "chainedKafkaTransactionManager")
public ChainedKafkaTransactionManager<Object, Object> chainedKafkaTransactionManager(
@Qualifier(value = "transactionalKafkaProducer") ProducerFactory<String, Object> producerFactory,
@Qualifier(value = "transactionManager") JpaTransactionManager jpaTransactionManager,
@Qualifier(value = "rabbitTransactionManager") RabbitTransactionManager rabbitTransactionManager) {
KafkaTransactionManager producerKtm = new KafkaTransactionManager(producerFactory);
producerKtm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<>(jpaTransactionManager, producerKtm, rabbitTransactionManager);
}
// Listener config
listenerFactory.getContainerProperties().setTransactionManager(chainedKafkaTransactionManager);
// Listener
@Transactional(transactionManager = "chainedKafkaTransactionManager")
public void onMessage(final ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer){
try {
RetryState retryState = new DefaultRetryState(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
retryTemplate.execute(context -> {
saveToDb() // This rolls back
sendToKafkaWithTransaction(topic, payload); // This message gets to Kafa, it should not.
sendToRabbitmqWithTransaction(topic, payload); // This rolls back
throw new Exception("Out of Anger");
return null;
}, recoveryCallBack, retryState);
acknowledgment.acknowledge();
}
catch (ListenerExecutionFailedException e) {
throw e;
}
}
// See logs
[ consumer-0-C-1] o.s.a.r.t.RabbitTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Setting JPA transaction on EntityManager [SessionImpl(104745239<open>)] rollback-only
EDIT: Adding spring boot config:
spring.kafka:
admin:
bootstrap-servers: ${kakfa.host}
consumer:
group-id: test-consumers
client-id: test-consumers
auto-offset-reset: latest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
properties:
isolation-level: read_committed
producer:
client-id: test-producer
acks: all
retries: 3
transaction-id-prefix: test-producer-tx-
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
enable.idempotence: true
transactional.id: tran-id-1-
max.in.flight.requests.per.connection: 5
isolation-level: read_committed
Edit More Logs
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.saveAndFlush]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$DefaultCrudMethodMetadata@18061927] for key [public abstract java.lang.Object org.springframework.data.jpa.repository.JpaRepository.saveAndFlush(java.lang.Object)] from thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=topic-1, partition=null)
[-27cf188e6c23-1] org.apache.kafka.clients.Metadata : Cluster ID: r3baK471R6mIft7L_DIOIg
[ consumer-0-C-1] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=topic-1, partition=null)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.orm.jpa.EntityManagerHolder@16bfeffa] for key [org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean@30eed725] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Found thread-bound EntityManager [SessionImpl(23309560<open>)] for JPA transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.jdbc.datasource.ConnectionHolder@cbfb10d] for key [HikariDataSource (HikariPool-1)] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating in existing transaction
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.arca.framework.messaging.services.impl.BoradcastMessageServiceImpl.sendTransactional]
[-27cf188e6c23-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=topic-1, partition=null), metadata: topic-1-0@185
[ consumer-0-C-1] o.s.a.r.c.CachingConnectionFactory : Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,4)
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate : Executing callback RabbitTemplate$$Lambda$1237/634386320 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] o.s.amqp.rabbit.core.RabbitTemplate : Publishing message (Body:'{ }')
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] bound to thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [messaging.services.impl.RabbitMessageServiceImpl.send]
[ consumer-0-C-1] c.a.f.m.k.r.KafkaSingleDispatchReceiver : Unable to process messages of type: [class messaging.kafka.events.acquiringtmstransaction.TmsTransactionEvent] and id: [92dccb48-2cd2-47b8-b778-8550dcd72d04]
[ consumer-0-C-1] .a.f.m.k.c.KafkaTransactionalRetryPolicy : Retry count [1] for message [{}]
[ consumer-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [messaging.kafka.receivers.KafkaReceiver.onMessage] after exception: exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : Applying rules to determine whether transaction should rollback on exceptions.MyException: Out of anger
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : Winning rollback rule is: null
[ consumer-0-C-1] o.s.t.i.RuleBasedTransactionAttribute : No relevant rollback rule found: applying default rules
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Triggering beforeCompletion synchronization
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@2655e199] for key [CachingConnectionFactory [channelCacheSize=25, host=localhost, port=5672, active=true rabbitConnectionFactory]] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Initiating transaction rollback
[ consumer-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : abortTransaction: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b70782d, txId=tran-id-1-acquiring-tms-transaction-consumers.pos_txn_log.0]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Clearing transaction synchronization
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Triggering afterCompletion synchronization
[ consumer-0-C-1] o.s.a.r.connection.RabbitResourceHolder : Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,4), conn: Proxy@3c964873 Shared Rabbit Connection: SimpleConnection@5d9ccad2 [delegate=amqp://[email protected]:5672/, localPort= 64338]
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@23b172f4] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@4df8f4e9] from thread [consumer-0-C-1]
[ consumer-0-C-1] o.s.k.t.KafkaTransactionManager : Resuming suspended transaction after completion of inner transaction
[ consumer-0-C-1] .s.t.s.TransactionSynchronizationManager : Initializing transaction synchronization
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Participating transaction failed - marking existing transaction as rollback-only
[ consumer-0-C-1] o.s.orm.jpa.JpaTransactionManager : Setting JPA transaction on EntityManager [SessionImpl(23309560<open>)] rollback-only
[ consumer-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
That's the way Kafka transactions work. Published records are always written to the log, followed by a marker record that indicates whether the transaction committed, or rolled back.
To avoid seeing the rolled-back records, you have to set the consumer isolation.level
property to read_committed
(it is read_uncommitted
by default).
EDIT
It's because you are starting a new transaction:
template.executeInTransaction(kt -> kt.send(record));
/**
* Execute some arbitrary operation(s) on the operations and return the result.
* The operations are invoked within a local transaction and do not participate
* in a global transaction (if present).
* @param callback the callback.
* @param <T> the result type.
* @return the result.
* @since 1.1
*/
@Nullable
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
Just call template.send()
and the template will participate in the transaction started by the container.
You can also remove the @Transactional
from that method.
EDIT2
This works as expected for me...
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.isolation-level=read-committed
logging.level.org.springframework.transaction=trace
logging.level.org.springframework.kafka.core=trace
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {
public static void main(String[] args) {
SpringApplication.run(So66306109Application.class, args);
}
@Autowired
Foo foo;
@Transactional
@KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
public void listen(String in) {
System.out.println(in);
this.foo.send(in.toUpperCase());
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so66306109-2", topics = "so66306109-2")
public void listen2(String in) {
System.out.println(in);
}
}
@Component
class Foo {
@Autowired
KafkaTemplate<String, String> template;
@Transactional // Not needed - we're already in a transaction
void send(String in) {
this.template.send("so66306109-2", in);
}
}
EDIT3
If you cannot upgrade to a supported version, you need to disable transactions in the container, and manage it yourself in your code, within the retry execute scope.
Here is an example.
@SpringBootApplication
@EnableTransactionManagement
public class So66306109Application {
public static void main(String[] args) {
SpringApplication.run(So66306109Application.class, args);
}
@Autowired
Foo foo;
@Autowired
RetryTemplate template;
@KafkaListener(id = "so66306109", topics = "so66306109") // Not really needed; the container has already started it
public void listen(ConsumerRecord<String, String> in) {
this.template.execute(context -> {
System.out.println(in);
this.foo.send(in);
return null;
}, context -> {
System.out.println("RETRIES EXHAUSTED");
return null;
});
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so66306109").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66306109-2").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so66306109-2", topics = "so66306109-2")
public void listen2(String in) {
System.out.println(in);
}
@Bean
ChainedKafkaTransactionManager<String, String> chainedTm(KafkaTransactionManager<String, String> ktm,
ConcurrentKafkaListenerContainerFactory<?, ?> factory) {
// transactions can't be started by the container
factory.getContainerProperties().setTransactionManager(null);
return new ChainedKafkaTransactionManager<>(ktm);
}
@Bean
public RetryTemplate template() {
return new RetryTemplate();
}
}
@Component
class Foo {
@Autowired
KafkaTemplate<String, String> template;
@Autowired
ProducerFactory<String, String> pf;
@Transactional("chainedTm")
public void send(ConsumerRecord<String,String> in) {
// updateDB
this.template.send(new ProducerRecord<String, String>("so66306109-2", null, null, in.value().toUpperCase()));
this.template.sendOffsetsToTransaction(Collections.singletonMap(new TopicPartition(in.topic(), in.partition()),
new OffsetAndMetadata(in.offset() + 1)));
// simulate a DB rollback
KafkaResourceHolder<String, String> resource = (KafkaResourceHolder<String, String>) TransactionSynchronizationManager
.getResource(this.pf);
resource.setRollbackOnly();
}
}
Note; you must NOT manually acknowledge such records; instead, send the offset to the transaction before it is committed.