I want to update a list with the result of different threads.
mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val job = ArrayList<Job>()
val ans = mainScope.async {
var i = 0
for (j in (0..5)) {
job.add(
launch {
val res = async {
func1()
}
x += res.await()
}
)
}
job.joinAll()
}
ans.await()
return x
}
fun func1(): List<A> {
//Perform some operation to get list abc
var abc: List<A> = listOf<A>()
delay(1000)
return abc
}
The list "x" is not getting updated properly. Sometimes, it appends the "res".. sometimes it does not. Is there a thread-safe way to modify lists like this?
New implementation:
mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val ans = mainScope.async {
List(6) {
async{
func1()
}
}.awaitAll()
}
print(ans)
for (item in ans) {
x+= item as List<A> // item is of type Kotlin.Unit
}
}
Here is a simpler version of what you're doing, which avoids the synchronization problems you might be running into:
suspend fun mainFunction(): List<A> {
return coroutineScope {
List(6) { async { func1() } }.awaitAll()
}
}
You can read the long answer if you want to unpack this. I will explain the different things in the original code that are not really idiomatic and could be replaced.
There are multiple non-idiomatic things in the code in the question, so I'll try to address each of them.
If you just want to repeat an operation several times, it's simpler to just use repeat(6) instead of for (j in 0..5)
. It's easier to read, especially when you don't need the index variable:
suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val job = ArrayList<Job>()
val ans = mainScope.async {
repeat(6) {
job.add(
launch {
val res = async {
func1()
}
x += res.await()
}
)
}
job.joinAll()
}
ans.await()
return x
}
If what you want is to create a list out of that loop, you can also use List(size) { computeElement() }
instead of repeat
(or for
), which makes use of the List factory function:
suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val ans = mainScope.async {
val jobs = List(6) {
launch {
val res = async {
func1()
}
x += res.await()
}
}
jobs.joinAll()
}
ans.await()
return x
}
There is no need to wrap your launches here with an extra async, you can just use your scope on the launch
es directly:
suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val jobs = List(6) {
mainScope.launch {
val res = async {
func1()
}
x += res.await()
}
}
jobs.joinAll()
return x
}
Using async { someFun() }
and then immediately await
-ing this Deferred
result is equivalent to just calling someFun()
directly (unless you're using a different scope or context, which you aren't doing here for the inner most logic).
So you can replace the inner-most part:
val res = async {
func1()
}
x += res.await()
By just x += func1()
, which gives:
suspend fun mainFunction(): List<A> {
var x: List<A> = listOf<A>()
val jobs = List(6) {
mainScope.launch {
x += func1()
}
}
jobs.joinAll()
return x
}
If you want results, it is usually more practical to use async
instead of launch
. When you use launch
, you have to store the result somewhere manually (which makes you run into synchronization problems like you have now). With async
, you get a Deferred<T>
value which you can then await()
, and when you have a list of Deferred
there is no synchronization issues when you await them all.
So the general idea of the previous code is bad practice and might bite you because it requires manual synchronization. You can replace it by:
suspend fun mainFunction(): List<A> {
val deferredValues = List(6) {
mainScope.async {
func1()
}
}
val x = deferredValues.awaitAll()
return x
}
Or simpler:
suspend fun mainFunction(): List<A> {
return List(6) {
mainScope.async {
func1()
}
}.awaitAll()
}
It is usually a smell to join()
jobs manually. If you want to wait for some coroutines to finish, it is more idiomatic to launch all those coroutines within a coroutineScope { ... }
block, which will suspend until all child coroutines finish.
Here we have already replaced all launch
that we join()
with async
calls that we await
, so this doesn't really apply anymore, because we still need to await()
the deferred values in order to get the results. However, since we are in a suspend function already, we can still use coroutineScope
instead of an external scope like mainScope
to ensure that we don't leak any coroutines:
suspend fun mainFunction(): List<A> {
return coroutineScope {
List(6) { async { func1() } }.awaitAll()
}
}