Search code examples
kotlinarchitecturefuturedeferredkotlin-coroutines

A Kotlin service with request queue


I would like to design an a service with the following API:

suspend fun getUsers(request: Request): List<User>

Under the hood I would send a request to the server (doesn't matter how, but lets say it's a reactive WebClient), but here's a trick: I can only send requests as often as every 500 ms, otherwise I will get an error.

Could someone recommend me how I could implement it such way that when I call getUsers from a coroutine it suspends, the unit of work is being added to some queue of the service that has this method, then implemented at some point in time and returned the result?

I assume I can use some ReceiveChannel as a queue, have a for loop for its elements with a delay inside, but I'm a bit lost where to put this logic. Should this be like a background method that will run forever and gets called by getUsers? Probably the close method will never be called, so this method can also be suspended, but how do I pass the value back from this infinite running method to getUsers that needs the results?

EDIT

At the moment I'm thinking of a solution like this:

private const val REQUEST_INTERVAL = 500

@Service
class DelayedRequestSenderImpl<T> : DelayedRequestSender<T> {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<T>> = Channel()

    override suspend fun requestAsync(block: () -> T): Deferred<T> {
        val deferred = GlobalScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred
    }

    @PostConstruct
    private fun startRequestProcessing() = GlobalScope.launch {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < REQUEST_INTERVAL) {
                delay(REQUEST_INTERVAL - diff)
                lastRequestTime = now
            }
            request.start()
        }
    }
}

The problem I see here is that I have to generify the class to make the requestChannel generic, since the result of request may be anything. But this means that each instance of DelayedRequestSender will be tied to a particular type. Any advice on how to avoid this?

EDIT 2

Here's a refined version. The only possible flow that I see at the moment is that we have to make @PostConstruct method public in order to write any tests if we want or use reflection.

The idea was to not use GlobalScope and also have a separate Job for the processing method. Is this a fine approach?

interface DelayingSupplier {
    suspend fun <T> supply(block: () -> T): T
}

@Service
class DelayingSupplierImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingSupplier {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<*>> = Channel()
    private val coroutineScope = CoroutineScope(EmptyCoroutineContext)

    override suspend fun <T> supply(block: () -> T): T {
        val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred.await()
    }

    @PostConstruct
    fun startProcessing() = coroutineScope.launch(context = Job(coroutineScope.coroutineContext[Job])) {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < interval) {
                delay(interval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            request.start()
        }
    }
}

Solution

  • So this is the final implementation I came up with. Note the SupevisorJob as we don't want the processing to stop if one of requests fails, which is totally possible and fine (in my case at least).

    Also, the option suggested by @Laurence might be better, but I decided to not use actors for now due to API being marked as obsolete.

    @Service
    class DelayingRequestSenderImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingRequestSender {
        private var lastRequestTime: LocalDateTime = LocalDateTime.now()
        private val requestChannel: Channel<Deferred<*>> = Channel()
        //SupervisorJob is used because we want to have continuous processing of requestChannel
        //even if one of the requests fails
        private val coroutineScope = CoroutineScope(SupervisorJob())
    
        override suspend fun <T> request(block: () -> T): T {
            val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
            requestChannel.send(deferred)
            return deferred.await()
        }
    
        @PostConstruct
        fun startProcessing() = coroutineScope.launch {
            for (request in requestChannel) {
                val now = LocalDateTime.now()
                val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
                if (diff < interval) {
                    delay(interval - diff)
                }
                lastRequestTime = LocalDateTime.now()
                request.start()
            }
        }
    }