Search code examples
kotlinkotlin-coroutinesarrow-kt

How can you paralellize the processing of File InputStreams in Kotlin / Arrow?


I am processing large Files, having a list of them:

val originalFiles: List<File>

I need to read the InputStream of each file, process it, and write it to another processedFile. For the sake of simplicity let's assume I just read the original InputStream and write it to the destination file output stream.

I would like to process the originalFiles in parallel.

The first straigtforward way would be to use parallelStream()

override suspend fun processFiles(workdir: File): Either<Throwable, Unit> = either {
    
    val originalFiles = ...

    originalFiles.parallelStream().forEach {
        val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
        logger.info("Starting merge {}", destination)
        FileOutputStream(destination).use { faos ->
            IOUtils.copy(it.inputStream(), faos)
        }
        logger.info("Finished processing {}", destination)            
    }
}

However, given that I'm working with coroutines and Arrow, I get a compile warning Possibly blocking call in non-blocking context could lead to thread starvation.

  • Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?
  • Is there a better way to paralellize the List processing with coroutines/Arrow?

Solution

  • Your best best would be to use parTraverse (going to be renamed to parMap in 2.x.x). This function comes from Arrow Fx Coroutines, there is also Flow#parMap and Flow#parMapUnordered that you can use instead.

    You also need to make sure that FileOutputStream is closed correctly, and in face of cancellation, and I would recommend using Resource for that.

    The Possibly blocking call in non-blocking context could lead to thread starvation warning will disappear by invoking it on the correct Dispatchers.IO.

    suspend fun processFiles(workdir: File): Unit {
      val originalFiles: List<File> = emptyList<File>()
      originalFiles.parTraverse(Dispatchers.IO) {
        val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
        logger.info("Starting merge {}", destination)
        FileOutputStream(destination).use { faos ->
          IOUtils.copy(it.inputStream(), faos)
        }
        logger.info("Finished processing {}", destination)
      }
    }
    

    So the summary the answers on your question:

    Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?

    Execute them using suspend + Dispatchers.IO.

    Is there a better way to paralellize the List processing with coroutines/Arrow?

    Leverage parTraverse to parallelise the List transformations in Kotlin Coroutines. Optionally, parTraverseN if you want to also limit the amount of parallel processes.