Search code examples
multithreadingscalaparallel-processingconcurrent.futures

handling scala futures out of memory error


i'm trying to make a scala process parallel using futures,

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))

val result = Future.traverse(listOfInputs) { input =>
    Future {
        // time consuming process
    }
}

result.onComplete {
    case Success(value) => display msg
    case Failure(exception) => throw exception
}

the above is working fine, when the number of inputs is less, when the number of inputs increases, the program consumes more memory, and the OS kills the process, during execution.

Is there anyway, where I can limit the memory or threads to use in Scala? any help is appreciated.


Solution

  • You're using a fixed size thread pool with 5 threads in your example, and you're constructing the Futures using Future { … }, which means that each Future will occupy one thread for as long as it's running. Therefore, in your example code, parallelism is already strictly limited to 5, and it's hard to give a useful answer to your question when your example code fails to even demonstrate the problem.

    That said, one way to limit parallelism is to use a semaphore. A semaphore is basically a finite set of permits for doing stuff, and if you write your code so that it will acquire a permit before starting work and put it back when it's done, this will limit the amount of parallelism in your app. There is a Semaphore implementation in twitter's util-core library.

    https://github.com/twitter/util

    https://twitter.github.io/util/docs/com/twitter/concurrent/AsyncSemaphore.html

    val sem = new AsyncSemaphore(5)
    val result = Future.traverse(listOfInputs) { input =>
      sem.acquireAndRun {
        // create Future here.
      }
    }
    

    However, I would recommend using something else entirely, because Scala Futures are horrible. They lack basic features like an asynchronous try/finally construct or the ability to interrupt a computation.

    ZIO has these features and many, many more built in. For instance you can iterate over a list using the ZIO.foreachParN method. It will limit parallelism, and should one of your computations fail with an error, it will abort all the others to avoid wasting time and memory. Resources such as open files will automatically be freed.