Search code examples
javamultithreadingjava.util.concurrent

How to synchronize multiple threads from accessing some common data


I have three different threads which creates three different objects to read/manipulate some data which is common for all the threads. Now, I need to ensure that we are giving an access only to one thread at a time.

The example goes something like this.

public interface CommonData {
   public void addData(); // adds data to the cache
   public void getDataAccessKey(); // Key that will be common across different threads for each data type
}

/*
* Singleton class
*/
public class CommonDataCache() {
   private final Map dataMap = new HashMap(); // this takes keys and values as custom objects
}

The implementation class of the interface would look like this

class CommonDataImpl implements CommonData {
    private String key;
    public CommonDataImpl1(String key) {
       this.key = key;
    }

    public void addData() {
      // access the singleton cache class and add
    }

    public void getDataAccessKey() {
      return key;
    }
}

Each thread will be invoked as follows:

CommonData data = new CommonDataImpl("Key1");
new Thread(() -> data.addData()).start();

CommonData data1 = new CommonDataImpl("Key1");
new Thread(() -> data1.addData()).start();

CommonData data2 = new CommonDataImpl("Key1");
new Thread(() -> data2.addData()).start();

Now, I need to synchronize those threads if and only if the keys of the data object (passed on to the thread) is the same.

My thought process so far:

I tried to have a class that provides the lock on the fly for a given key which looks something like this.

/*
* Singleton class
*/
public class DataAccessKeyToLockProvider {
    private volatile Map<String, ReentrantLock> accessKeyToLockHolder = new ConcurrentHashMap<>();

    private DataAccessKeyToLockProvider() {
    }

    public ReentrantLock getLock(String key) {
        return accessKeyToLockHolder.putIfAbsent(key, new ReentrantLock());
    }

    public void removeLock(BSSKey key) {
        ReentrantLock removedLock = accessKeyToLockHolder.remove(key);
    }
}

So each thread would call this class and get the lock and use it and remove it once the processing is done. But this can so result in a case where the second thread could get the lock object that was inserted by the first thread and waiting for the first thread to release the lock. Once the first thread removes the lock, now the third thread would get a different lock altogether, so the 2nd thread and the 3rd thread are not in sync anymore.

Something like this:

   new Thread(() -> {
       ReentrantLock lock = DataAccessKeyToLockProvider.get(data.getDataAccessKey());
       lock.lock();
       data.addData();
       lock.unlock();
       DataAccessKeyToLockProvider.remove(data.getDataAccessKey());
    ).start();

Please let me know if you need any additional details to help me resolve my problem

P.S: Removing the key from the lock provider is kind of mandatory as i will be dealing with some millions of keys (not necessarily strings), so I don't want the lock provider to eat up my memory

Inspired the solution provided @rzwitserloot, I have tried to put some generic code that waits for the other thread to complete its processing before giving the access to the next thread.

public class GenericKeyToLockProvider<K> {
    private volatile Map<K, ReentrantLock> keyToLockHolder = new ConcurrentHashMap<>();

    public synchronized ReentrantLock getLock(K key) {
        ReentrantLock existingLock = keyToLockHolder.get(key);
        try {
            if (existingLock != null && existingLock.isLocked()) {
                existingLock.lock(); // Waits for the thread that acquired the lock previously to release it
            }
            return keyToLockHolder.put(key, new ReentrantLock()); // Override with the new lock
        } finally {
            if (existingLock != null) {
                existingLock.unlock();
            }
        }
    }
}

But looks like the entry made by the last thread wouldn't be removed. Anyway to solve this?


Solution

  • First, a clarification: You either use ReentrantLock, OR you use synchronized. You don't synchronized on a ReentrantLock instance (you synchronize on any object you want) – or, if you want to go the lock route, you can call the lock lock method on your lock object, using a try/finally guard to always ensure you call unlock later (and don't use synchronized at all).

    synchronized is low-level API. Lock, and all the other classes in the java.util.concurrent package are higher level and offer far more abstractions. It's generally a good idea to just peruse the javadoc of all the classes in the j.u.c package from time to time, very useful stuff in there.

    The key issue is to remove all references to a lock object (thus ensuring it can be garbage collected), but not until you are certain there are zero active threads locking on it. Your current approach does not know how many classes are waiting. That needs to be fixed. Once you return an instance of a Lock object, it's 'out of your hands' and it is not possible to track if the caller is ever going to call lock on it. Thus, you can't do that. Instead, call lock as part of the job; the getLock method should actually do the locking as part of the operation. That way, YOU get to control the process flow. However, let's first take a step back:

    You say you'll have millions of keys. Okay; but it is somewhat unlikely you'll have millions of threads. After all, a thread requires a stack, and even using the -Xss parameter to reduce the stack size to the minimum of 128k or so, a million threads implies you're using up 128GB of RAM just for stacks; seems unlikely.

    So, whilst you might have millions of keys, the number of 'locked' keys is MUCH smaller. Let's focus on those.

    You could make a ConcurrentHashMap which maps your string keys to lock objects. Then:

    To acquire a lock:

    Create a new lock object (literally: Object o = new Object(); - we are going to be using synchronized) and add it to the map using putIfAbsent. If you managed to create the key/value pair (compare the returned object using == to the one you made; if they are the same, you were the one to add it), you got it, go, run the code. Once you're done, acquire the sync lock on your object, send a notification, release, and remove:

    public void doWithLocking(String key, Runnable op) {
        Object locker = new Object();
        Object o = concurrentMap.putIfAbsent(key, locker);
        if (o == locker) {
            op.run();
            synchronized (locker) {
                locker.notifyAll(); // wake up everybody waiting.
                concurrentMap.remove(key); // this has to be inside!
            }
        } else {
            ...
        }
    }
    

    To wait until the lock is available, first acquire a lock on the locker object, THEN check if the concurrentMap still contains it. If not, you're now free to retry this operation. If it's still in, then we now wait for a notification. In any case we always just retry from scratch. Thus:

    public void performWithLocking(String key, Runnable op) throws InterruptedException {
        while (true) {
            Object locker = new Object();
            Object o = concurrentMap.putIfAbsent(key, locker);
            if (o == locker) {
                try {
                    op.run();
                } finally {
                    // We want to lock even if the operation throws!
                    synchronized (locker) {
                        locker.notifyAll(); // wake up everybody waiting.
                        concurrentMap.remove(key); // this has to be inside!
                    }
                }
                return;
            } else {
                synchronized (o) {
                    if (concurrentMap.containsKey(key)) o.wait();
                    }
                }
            }
        }
    }
    

    Instead of this setup where you pass the operation to execute along with the lock key, you could have tandem 'lock' and 'unlock' methods but now you run the risk of writing code that forgets to call unlock. Hence why I wouldn't advise it!

    You can call this with, for example:

    keyedLockSupportThingie.doWithLocking("mykey", () -> {
        System.out.println("Hello, from safety!");
    });