Search code examples
multithreadinglistkotlinthread-safetymutablelist

Update list with the results of different threads in Kotlin


I want to update a list with the result of different threads.

mainFunction(): List<A> {
  var x: List<A> = listOf<A>()
  val job = ArrayList<Job>()
  val ans = mainScope.async {
            var i = 0
            for (j in (0..5)) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x += res.await()
                    }
                )
            }
         job.joinAll()
        }
  ans.await()
  return x
}
fun func1(): List<A> {
 //Perform some operation to get list abc
 var abc: List<A> = listOf<A>()
 delay(1000)
 return abc
}

The list "x" is not getting updated properly. Sometimes, it appends the "res".. sometimes it does not. Is there a thread-safe way to modify lists like this?

New implementation:

mainFunction(): List<A> {
 var x: List<A> = listOf<A>()
 val ans = mainScope.async {
   List(6) {
     async{
      func1()
     }
   }.awaitAll()
 }
 print(ans)
 for (item in ans) {
  x+= item as List<A> // item is of type Kotlin.Unit
 }
}

Solution

  • Short answer

    Here is a simpler version of what you're doing, which avoids the synchronization problems you might be running into:

    suspend fun mainFunction(): List<A> {
        return coroutineScope {
            List(6) { async { func1() } }.awaitAll()
        }
    }
    

    You can read the long answer if you want to unpack this. I will explain the different things in the original code that are not really idiomatic and could be replaced.

    Long answer

    There are multiple non-idiomatic things in the code in the question, so I'll try to address each of them.

    Indexed for loop for 0-based range

    If you just want to repeat an operation several times, it's simpler to just use repeat(6) instead of for (j in 0..5). It's easier to read, especially when you don't need the index variable:

    suspend fun mainFunction(): List<A> {
        var x: List<A> = listOf<A>()
        val job = ArrayList<Job>()
        val ans = mainScope.async {
            repeat(6) {
                job.add(
                    launch {
                        val res = async {
                            func1()
                        }
                        x += res.await()
                    }
                )
            }
            job.joinAll()
        }
        ans.await()
        return x
    }
    

    Creating lists with a loop

    If what you want is to create a list out of that loop, you can also use List(size) { computeElement() } instead of repeat (or for), which makes use of the List factory function:

    suspend fun mainFunction(): List<A> {
        var x: List<A> = listOf<A>()
        val ans = mainScope.async {
            val jobs = List(6) {
                launch {
                    val res = async {
                        func1()
                    }
                    x += res.await()
                }
            }
            jobs.joinAll()
        }
        ans.await()
        return x
    }
    

    Extra async

    There is no need to wrap your launches here with an extra async, you can just use your scope on the launches directly:

    suspend fun mainFunction(): List<A> {
        var x: List<A> = listOf<A>()
        val jobs = List(6) {
            mainScope.launch {
                val res = async {
                    func1()
                }
                x += res.await()
            }
        }
        jobs.joinAll()
        return x
    }
    

    async + immediate await

    Using async { someFun() } and then immediately await-ing this Deferred result is equivalent to just calling someFun() directly (unless you're using a different scope or context, which you aren't doing here for the inner most logic).

    So you can replace the inner-most part:

    val res = async {
        func1()
    }
    x += res.await()
    

    By just x += func1(), which gives:

    suspend fun mainFunction(): List<A> {
        var x: List<A> = listOf<A>()
        val jobs = List(6) {
            mainScope.launch {
                x += func1()
            }
        }
        jobs.joinAll()
        return x
    }
    

    launch vs async

    If you want results, it is usually more practical to use async instead of launch. When you use launch, you have to store the result somewhere manually (which makes you run into synchronization problems like you have now). With async, you get a Deferred<T> value which you can then await(), and when you have a list of Deferred there is no synchronization issues when you await them all.

    So the general idea of the previous code is bad practice and might bite you because it requires manual synchronization. You can replace it by:

    suspend fun mainFunction(): List<A> {
        val deferredValues = List(6) {
            mainScope.async {
                func1()
            }
        }
        val x = deferredValues.awaitAll()
        return x
    }
    

    Or simpler:

    suspend fun mainFunction(): List<A> {
        return List(6) {
            mainScope.async {
                func1()
            }
        }.awaitAll()
    }
    

    Manual joins vs coroutineScope

    It is usually a smell to join() jobs manually. If you want to wait for some coroutines to finish, it is more idiomatic to launch all those coroutines within a coroutineScope { ... } block, which will suspend until all child coroutines finish.

    Here we have already replaced all launch that we join() with async calls that we await, so this doesn't really apply anymore, because we still need to await() the deferred values in order to get the results. However, since we are in a suspend function already, we can still use coroutineScope instead of an external scope like mainScope to ensure that we don't leak any coroutines:

    suspend fun mainFunction(): List<A> {
        return coroutineScope {
            List(6) { async { func1() } }.awaitAll()
        }
    }