Search code examples
concurrencykotlincoroutinekotlinx.coroutines

Coroutine: Deferred operations in a List run sequentially.


I have a List of parameters for execute the download. I'm mapping the elements of that list into a Deferred that execute the download; then, forEach element of the List, I call await, but apparently the downloads are executed sequentially.

This is my function:

suspend fun syncFiles() = coroutineScope {
    remoteRepository.requiredFiles()
        .filter { localRepository.needToDownload( it.name, it.md5 ) }
        .map { async { downloader( it ) } }
        .forEach { deferredResult ->
​
            when ( val result = deferredResult.await() ) {
                is DownloadResult.Layout ->  localRepository.storeLayout( result.content )
                is DownloadResult.StringR -> localRepository.storeFile( result )
            }
        }
}

This is my test:

private val useCase = SyncUseCaseImpl.Factory(
        mockk { // downloader
            coEvery { this@mockk.invoke( any() ) } coAnswers { delay(1000 );any() }
        },
        ...
    ).newInstance()
​
@Test
fun `syncFiles downloadConcurrently`() = runBlocking {
    val requiredFilesCount = useCase.remoteRepository.requiredFiles().size
    assert( requiredFilesCount ).isEqualTo( 3 )
​
    val time = measureTimeMillis {
        useCase.syncFiles()
    }
​
    assert( time ).isBetween( 1000, 1100 )
}

And this is my result: expected to be between:<1000L> and <1100L> but was:<3081L>

I think is weird, because these 2 dummy tests complete correctly, maybe I am missing something ( ? )

@Test // OK
fun test() = runBlocking {
    val a = async { delay(1000 ) }
    val b = async { delay(1000 ) }
    val c = async { delay(1000 ) } ​
    val time = measureTimeMillis {
        a.await()
        b.await()
        c.await()
    } ​
    assert( time ).isBetween( 1000, 1100 )
} ​

@Test // OK
fun test() = runBlocking {
    val wasteTime: suspend () -> Unit = { delay(1000 ) }
    suspend fun wasteTimeConcurrently() = listOf( wasteTime, wasteTime, wasteTime )
            .map { async { it() } }
            .forEach { it.await() } ​
    val time = measureTimeMillis {
        wasteTimeConcurrently()
    } ​
    assert( time ).isBetween( 1000, 1100 )
}

Solution

  • This may happens if the job blocks the entire thread, for example IO bound tasks that blocks the entire thread execution thus blocking all the other coroutines on that thread. If you are using Kotlin JVM try calling async(IO) { } to run the couroutine under the IO dispatcher so that the couroutine environment now knows that that job will block the entire thread and behave accordingly.

    Have a look here for other dispatchers: https://kotlinlang.org/docs/reference/coroutines/coroutine-context-and-dispatchers.html#dispatchers-and-threads