Search code examples
javaspring-bootjpaspring-data-jpaspring-kafka

spring jpa crud repository fetching old data from DB in multithreaded environment


i am running spring boot application with mysql database and kafka as messaging service(did transaction synchronization using chainedKafkaTransactionManager for kafka and mysql) for some asynchronous operations.

when the multiple messages are coming to kafka listener sometime the old data coming from data base instead of previously data which is commited.

I am using crud repository and also this case only happens on the multiple messages at a same time

example: updating one object PERSON which is having name and id;

1st message will get the object by id and update the name as SAM.

2nd message will get the object and updates name as REGO

3rd message will get the object then if i check the data it containes SAM as name but in database its having REGO.

I tried with adding isolation property as read commited in transaction but no luck

// listeners  

@Autowired
private PersonRepository personRepository;

@Autowired
private AddressRepository addressRepository;     

@KafkaListener(id = "update_name", topics = "update_name")
@Transactional(readOnly = false)
public void updateName(PersonModel personModel) {
    Person person = personRepository.findById(personModel.getId());   
    log.info("before -> name which is in database : " + person.getName());
    person.setName(personModel.getName());
    person = personRepository.save(person);
    log.info("after-> name which is in database : " + person.getName());
}

@KafkaListener(id = "update_name_and_address", topics = "update_name_and_address")
@Transactional(readOnly = false)
public void updateNameAndAddress(PersonModel personModel) {
    Address addr= addressRepository.findById(personModel.getAddresId);    
    addr.setPlace(personModel.getPlace());
    addressRepository.save(addr);

    updateName(personModel);
}

// repository
public interface PersonRepository extends CrudRepository<Person , Integer> {

}

I need latest data from database


Solution

  • This happens when the 3rd transaction starts before the 2nd one commits. The effect gets more obvious because JPA doesn't reload an entity once it is loaded in a session.

    In order to minimise this problem keep your transaction as short as possible. Also make sure you handle optimistic locking exceptions probably by doing a retry in a new transaction.