Search code examples
kotlinkotlin-coroutines

Correct way of locking a mutex in Kotlin


I want to implement a simple thread-safe Buffer, using Kotlin Coroutines, because coroutines are already used within the project.

The buffer will be used both in multi-thread and single-thread contexts, so having suspend fun getMostRecentData() doesn't seem very reasonable (see code below).

This is what I have so far. The fact that I have to write all that code to lock the mutex makes me wonder if I'm doing something wrong.

Anyway here's the code:

class SafeBuffer(
    private val dispatcher: CoroutineDispatcher,
    private val bufferSize: Int
    ) {

    private val buffer = LinkedList<MyDataType>()
    private val mutex = Mutex()

    val size: Int
        get() = buffer.size

   // First approach: make a suspend fun
   // Not great because I will need a runBlocking{} statement somewhere, every time I want to access the buffer
   suspend fun getMostRecentData() : MyDataType? {
        mutex.withLock {
            return if (buffer.isEmpty()) null else buffer.last
        }
    }

   // Second approach: use a runBlocking block inside the function
   // Seems like it is missing the purpose of coroutines, and I'm not 
   // sure it is actually thread safe if other context is used somehow?
   fun getMostRecentData() : MyDataType? {
        runBlocking(dispatcher) {
            mutex.withLock {
                return if (buffer.isEmpty()) null else buffer.last
            }
        }
    }

    /**** More code ****/
    (...)

}

So what's the most idiomatic/elegant way of achieving this?


Solution

  • Expanding on my comment, I think it would be idiomatic to have the buffer class only expose a suspend fun, as the consumer of the class would be responsible for figuring out how they want to use it (via runBlocking or from another coroutine). If you see this use case coming up a lot, an idiomatic approach may be to have an extension function on SafeBuffer to offer this functionality.

    Extension functions are used all over the place in the coroutines API. In your code example, even Mutex.withLock is defined as an extension function.

    class SafeBuffer(...) {
    
        private val buffer = LinkedList<MyDataType>()
        private val mutex = Mutex()
    
        suspend fun getMostRecentData() : MyDataType? =
            mutex.withLock {
                if (buffer.isEmpty()) null else buffer.last
            }
    }
    
    fun SafeBuffer.getMostRecentDataBlocking(): MyDataType? =
        runBlocking {
            getMostRecentData()
        }