Im new to kafka and I use Kafka Producer Java api.
Facing this issue with Kafka,
Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION
.
people have written that producer.abortTransaction()
should be called only when there are no transactions in flight....
Any idea how to check whether there are transactions in flight? and how to clear/stop them?
This is my code:
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if ( e != null){
logger.info("Record was not sent due to kafka issue");
throw new KafkaException("Record was not sent due to kafka issue");
}
}
});
} catch (KafkaException e){
producer.abortTransaction();
}
What I need to achieve is detecting when the kafka is stopped and in this case clear all the buffers so the records In these buffers do not appear in the consumer side when the kafka is started again.
What you usually would do in this case is to apply a transaction which is described in the Java Docs of the KafkaProducer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
That way either all or none of the 100 records are visible to the consumer if isolation.level
is set to read_committed
.
You are closing producer.close()
for non-recoverable Exceptions such as
ProducerFencedException: This fatal exception indicates that another producer with the same transactional.id
has been started. It is only possible to have one producer instance with a transactional.id
at any given time, and the latest one to be started "fences" the previous instances so that they can no longer make transactional requests. When you encounter this exception, you must close the producer instance.
OutOfOrderSequenceException: This exception indicates that the broker received an unexpected sequence number from the producer, which means that data may have been lost. If the producer is configured for idempotence only (i.e. if enable.idempotence
is set and no transactional.id
is configured), it is possible to continue sending with the same producer instance, but doing so risks reordering of sent records. For transactional producers, this is a fatal error and you should close the producer.
AuthorizationException: [self-explanatory]