Search code examples
javaspring-bootrabbitmqhazelcastspring-rabbit

Use ConcurrentHashMap to replace HazelCast IMap with locking


Removing HazelCast from an application that no longer needs distributed caching.
However I still need to keep the synchronized access to a Map similar to IMap.
I first wanted to replace IMap with Map and ConcurrentHashMap.

Now the collection is just a simple Java Map, but I still need to control asynchronous read/write to it.
Would a ConcurrentHashMap be enough to solve this, or do I need other locking solutions/mechanism.

The application is using RabbitMQ, with 3 Queues.
With HazelCast I had a IMap with domain objects, that i performed locking on in each MQ thread accessing it.

During Initialization of a Batch job, it creates new domain objects, and places it on the Map.
At the end of the Initialization it sends out a request on 3 different message queues for each object in the Map.

Only 1 MQ thread should be able to get an object from the Map, update it, and put it back on the Map.
Can ConcurrentHashMap handle that, I do I need to implement some locking/unlocking between the Map.get and Map.put?

With HazelCast this was done with locking on the Map.

IMap<Long, MyObject> myCollection;

public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
    final Long myObjectId = response.getObjectId();
    myCollection.lock(myObjectId);
    try {
        final MyObject myObject = myCollection.get(myObjectId);
        updateMyObject(myObject, response);
        myCollection.put(myObjectId, myObject)
        if (myObject.isCompleted()) {
            repository.save(myObject);
        }
    } finally {
        myCollection.unlock(myObjectId);
    }
}

public void processMessage2(final MyResponse2 reponse) { // MQ-Thread2
    // Similar with locking as processMessage1
}

public void processMessage3(final MyResponse3 reponse) { // MQ-Thread3
    // Similar with locking as processMessage1
}

How my code is now when changed HazelCast and IMap locking to ConcurrentHashMap.


@Data
@Component
public class MyCache {
    private final Map<Long, Producer> producers = new HashMap<>();
    private final ConcurrentMap<Long, Customer> customers = new ConcurrentHashMap<>();
}

@Service
public class CustomerService {

    private final MyCache myCache;

    private final StorageService storageService;

    public CustomerService(final MyCache myCache, final StorageService storageService) {
        this.myCache = myCache;
        this.storageService = storageService;
    }

    @Transaction
    public void startProcessingAll() {
        getProducers();
        deleteCustomers();
        myCache.getProducers().keySet().forEach(id -> startProcessingProducer(id));
    }

    private void startProcessingProducer(Long producerId) {
        log.info("Started processing for producerId={}", producerId);
        initialize(producerId);
    }

    private void initialize(Long producerId) {
        log.info("Initialize start {}", producerId);
        final Customer customer = new Customer();
        sendRabbitMq1(producerId);
        sendRabbitMq2(producerId);
        sendRabbitMq3(producerId);

        myCache.getCustomers().put(producerId, customer);
        log.info("Initialize end {}", producerId);
    }

    private void deleteCustomers() {
        storageService.deleteCustomers();
    }

}

@Service
public class MessageService {

    private final MyCache myCache;

    private final StorageService storageService;

    public MessageService(final MyCache myCache, final StorageService storageService) {
        this.myCache = myCache;
        this.storageService = storageService;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse1> response1() {
        return this::processResponse1;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse2> response2() {
        return this::processResponse2;
    }

    @Bean
    @Transactional
    public Consumer<MyResponse3> response3() {
        return this::processResponse3;
    }

    private void processResponse1(final MyResponse1 response) {
        myCache.getCustomers().computeIfPresent(response.getProducerId(),
            (producerId, customer) -> updateFromResponse1(response, customer));
    }

    private Customer updateFromResponse1(final MyResponse1 response, final Customer customer) {
        log.debug("Got Response 1 {}", response);
        // Updating Customer with Response
        process(customer, storageService::saveCustomer);
        return customer;
    }

    public void process(final Customer customer, final Consumer<Customer> customerUpdateCallback) {
        // Processing Customer
        customerUpdateCallback.accept(customer);
    }

}

@Service
public class StorageService {

    private final CustomerRepository customerRepository;

    public StorageService(final CustomerRepository customerRepository) {
        this.customerRepository = customerRepository;
    }

    public void saveCustomer(final Customer customer) {
        customerRepository.save(customer);
    }

    public void deleteCustomer() {
        customerRepository.deleteAll();
    }

}

Solution

  • ConcurrentHashMap can address key locking using one of the following method:

    • compute if you need to manage both adding and updating new objects to the map
    • computeIfPresent if only update is required.

    The key information is that the javadoc states:

    The entire method invocation is performed atomically. The supplied function is invoked exactly once per invocation of this method.

    If we adapt your code sample to work for update with a ConcurrentHashMap, it would give something like this:

    final Map<Long, MyObject> myCollection = new ConcurrentHashMap<>();
    
    public void processMessage1(final MyResponse1 reponse) { // MQ-Thread1
        final Long myObjectId = response.getObjectId();
        myCollection.computeIfPresent(myObjectId, (key, oldValue) -> {
            var newValue = updateMyObject(oldValue, response);
            if (newValue.isCompleted()) {
                repository.save(newValue);
            }
            return newValue;
        });
    }