Search code examples
scalascala-catsscala-3cats-effect

How to avoid a race condition when using cats-effect based workers?


I created a tiny worker system to run parallel jobs with maximum multi-core processor utilization. It seems to work fine, but at some point, when working with a larger amount of jobs, an error appears (no error message, just hangs), which I suspect to be a low-level race condition. I cannot decide whether this is the fault of cats-effect, which I use to implement parallelism, or Atomic or TrieMap.

Here's a minified implementation which can be used to illustrate and test the issue:

import cats.effect.IO
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.concurrent.TrieMap
import cats.effect.unsafe.implicits.global
import java.util.concurrent.ConcurrentHashMap


object ThreadingError extends App:
  val jobIdsAdded = (0L until 10000L).toList
  for (_ <- jobIdsAdded.iterator) {
    ParallelJobs.addJob(() => {})
  }
  while(ParallelJobs.count.get() < 10000L) {
    print(s"${ParallelJobs.count.get()}\r")
    Thread.sleep(200)
  }

object ParallelJobs:
  private val allCores = Runtime.getRuntime.availableProcessors()
  private val availableCores = allCores - 1
  private val assignedTillJobId: AtomicLong = AtomicLong(0L)
  val jobsTrieMap: TrieMap[Long, () => Any] = TrieMap.empty[Long, () => Any]
  val jobsConcurrentHashMap: ConcurrentHashMap[Long, () => Any] = ConcurrentHashMap[Long, () => Any]()

  val locked: AtomicBoolean = AtomicBoolean(false)
  val count: AtomicLong = AtomicLong(0L)

  workerGroup
    .unsafeRunAsync(either => {
      if (either.isLeft)
        println(either.left.get.getMessage)
        either.left.get.printStackTrace()
    })

  def addJob(jobFn: () => Any): Unit =
    val jobId = jobsTrieMap.size
    jobsTrieMap(jobId) = jobFn
    //val jobId = jobsConcurrentHashMap.size()
    //jobsConcurrentHashMap.put(jobId, jobFn)

  private def workerGroup: IO[Unit] = (0 until availableCores).map(_ => worker).reduce(_ &> _)

  private def worker: IO[Unit] =
    IO({
      while (true) {
        if (!locked.get() && jobsTrieMap.nonEmpty)
        //if (!locked.get() && !jobsConcurrentHashMap.isEmpty)
          locked.set(true)
          val jobId = assignedTillJobId.getAndIncrement()
          val toDo = jobsTrieMap(jobId)
          //val toDo = jobsConcurrentHashMap.get(jobId)
          jobsTrieMap -= jobId
          //jobsConcurrentHashMap.remove(jobId)
          locked.set(false)
          toDo() // long running job
          count.incrementAndGet()
        else
          Thread.sleep(100)
      }
    })

As you can see, I also tried ConcurrentHashMap; using it simply stopped the app from running. I came up with some locking mechanism to test whether the issue was caused by multiple workers trying to write the TrieMap, but that did not help either.

I use Scala 3.3 and cats-effect 3.5.1


Solution

  • This code doesn't seem to be aware of how the concept of IO type is supposed to work.

    The idea behind all these IO types on Scala (Future, Cats Effect's IO, ZIO, Monix's Task...) is that you write kinda declarative code (and surely higher level than individual Threads and locks) and it is send to a runner with a Scheduler. Since all of you code is supposed to be build with: map, flatMap, recover, traverse, and other combinations, the assumption is that each individual uninterruptible piece inside IO is:

    • small/granular
    • short-running
    • NOT taking away the control from the runner

    It allows the Scheduler to:

    • control the parallelism!
    • control when the job is scheduled
    • and when it should be cancel (it simply NOT run the next operation defined by map/flatMap/etc, but create the error instead)

    This means among others:

    • no Thread.sleep - there are instructions to let the fiber sleep without blocking a whole Thread in a thread pool (and potentially blocking a whole thread pool with operations that sleep naively)
    • no while(true) - because it hijacks a whole Thread and makes it impossible to cancel the operation (and w
    • no blocking and locking - there are declarative ways of handling these operations which doesn't show the middle finger to a Thread pools, Executors and Schedulers (Refs, Semaphores, etc).

    It doesn't mean that it is absolutely not possible to use these in with IO monads, just that these were constructed to make it easy to build concurrent app... as long as you use them, because they expects that you don't break the assumptions they are build upon (similarly to how you might have a bad experience in Spring Framework + Hibernate if you decide that Thread pinning is not a thing and start running you own thing on the side). So, messing around low-level is something I recommend waiting until you understand how the runtime works underneath.

    Meanwhile, I recommend to rewrite this whole code to something which uses the build-in combinators and operations:

    import cats.effect.{IO, IOApp}
    import cats.syntax._
    
    object ExampleApp extends IOApp.Simple {
    
      val run: IO[Unit] = (0L until 10000L)
        .toList
        // will be run in parallel!
        .parTraverse { number =>
          // sleep 2s then print
          IO.sleep(2000) >> IO {
            println(i)
          }
        }
        .void // makes result Unit
    }
    

    I created a tiny worker system to run parallel jobs with maximum multi-core processor utilization

    All of the monads for side effects: scala.concurrent.Future, cats.effect.IO, monix.eval.Task, zio.ZIO have a default Scheduler which uses Runtime.getRuntime.availableProcessors() to decide the size of the Thread pool. And they are additionally aware of things like how to handle blocking (defined declaratively) in a way that prevents stucking all of the threads in "waiting" state where the job that could "notify" them cannot get to the thread pool. Or handling SIGTERM signals to cancel ongoing tasks but in a predicatable manner (jobs that need to finish could be made uncancellable, streams could be adjusted to safely finish processing that one task and then not take another, Resources could be safely closed and cleaned, etc).

    Meaning that the entirety of your code can be replaced with just some parTraverse or something to get the same result but without race conditions.

    If you want to control the size of your parallelism more granularly, you can use:

    • if you have a list of other finite collection of tasks then e.g.:
      IO.parSequenceN(listOfIOs)(sizeOfParallelism)
      
    • if you have unbounded collection of data there are functional streams
      fs2.Stream
        .fromIterator[IO]((1 until 100).iterator, chunkSize)
        .mapAsync(sizeOfParallelism) { int =>
          IO.sleep(2000) >> IO {
            println(i)
          }
        }
      

    Long story short: don't roll your own low-level solution (until you understand what the library does under the hood) and it will be fine. There is enough helpers to do roll your own solution on top of the library's API to deliver the same thing easier, faster and without race conditions or other issues.