Search code examples
kotlinconcurrencykotlin-coroutinesmemoization

Kotlin and coroutines: how to handle concurrent identical request efficiently?


My kotlin socket server application is perfoming some long running operation (with io and so on) for each request it receives within a dedicated coroutine by request.

During the processing of a request (A), all new requests identical to request (A) should return the same response as request (A). Requests coming after request (A) ended may not have the same response therefore needs to be processed again similarly to request(A) process.

Ideally, requests identical and concurrent to request (A) should "wait" for request (A) response and respond the same.

How to do that efficiently leveraging kotlin features (some example code would be welcomed) ?


Solution

  • We can store deferreds of actively running operations and reuse them when handling the same request concurrently. We need to create some kind of a key for each request to identify same requests, also we have to synchronize the access to the map of deferreds.

    suspend fun handleRequest(req: String): String {
        // TODO: acquire a key to detect same requests
        val key = req
        return requestsLock.withLock {
            requests.getOrPut(key) { scope.async {
                // TODO: process request and return response
                val resp = Instant.now().toString()
                delay(2000) // simulate processing
                
                requestsLock.withLock {
                    requests.remove(key)
                }
                resp
            } }
        }.await()
    }
    
    private val requests = mutableMapOf<String, Deferred<String>>()
    private val requestsLock = Mutex()
    private val scope = CoroutineScope(EmptyCoroutineContext)
    

    We use it like this:

    fun main() = runBlocking {
        repeat(10) { reqId ->
            val req = listOf("user1", "user2")[reqId % 2]
            launch {
                println("${Instant.now()}: sending request (#$reqId, $req)")
                val resp = handleRequest(req)
                println("${Instant.now()}: received response for request (#$reqId: $req): $resp")
            }
            delay(500)
        }
    }
    

    It sends requests every 500ms, alternating between user1 and user2. Each request is processed for 2000ms, so it processes 2-3 request of the same type concurrently. Sample output:

    2022-12-25T21:12:39.152071990Z: sending request (#0, user1)
    2022-12-25T21:12:39.649624457Z: sending request (#1, user2)
    2022-12-25T21:12:40.149865335Z: sending request (#2, user1)
    2022-12-25T21:12:40.650111474Z: sending request (#3, user2)
    2022-12-25T21:12:41.150518845Z: sending request (#4, user1)
    2022-12-25T21:12:41.200742398Z: received response for request (#0: user1): 2022-12-25T21:12:39.195417100Z
    2022-12-25T21:12:41.247125047Z: received response for request (#2: user1): 2022-12-25T21:12:39.195417100Z
    2022-12-25T21:12:41.247870116Z: received response for request (#4: user1): 2022-12-25T21:12:39.195417100Z
    2022-12-25T21:12:41.651080194Z: sending request (#5, user2)
    2022-12-25T21:12:41.651881431Z: received response for request (#1: user2): 2022-12-25T21:12:39.650267038Z
    2022-12-25T21:12:41.652313847Z: received response for request (#3: user2): 2022-12-25T21:12:39.650267038Z
    2022-12-25T21:12:42.151736711Z: sending request (#6, user1)
    2022-12-25T21:12:42.652327268Z: sending request (#7, user2)
    2022-12-25T21:12:43.153079734Z: sending request (#8, user1)
    2022-12-25T21:12:43.653804184Z: sending request (#9, user2)
    2022-12-25T21:12:43.654893480Z: received response for request (#5: user2): 2022-12-25T21:12:41.653631205Z
    2022-12-25T21:12:43.655384624Z: received response for request (#7: user2): 2022-12-25T21:12:41.653631205Z
    2022-12-25T21:12:43.655744177Z: received response for request (#9: user2): 2022-12-25T21:12:41.653631205Z
    2022-12-25T21:12:44.153250527Z: received response for request (#6: user1): 2022-12-25T21:12:42.152552397Z
    2022-12-25T21:12:44.153399331Z: received response for request (#8: user1): 2022-12-25T21:12:42.152552397Z
    

    As we can see, after waiting for 2000ms since the first request of a given type, it responds to all requests of that type, with exactly the same response.

    Above solution assumes processing of requests is not owned by callers, but by the custom coroutine scope. That means if the calling coroutine is cancelled, the request itself will still process. It we need to cancel processing if callers were cancelled, that would require more complicated solution to track callers.

    Also, this solution could be easily generalized into a reusable tool by receiving the key selector and request processor as parameters.