Search code examples
scalaakkaakka-stream

This Akka stream sometimes doesn't finish


I have a graph that reads lines from multiple gzipped files and writes those lines to another set of gzipped files, mapped according to some value in each line.

It works correctly against small data sets, but fails to terminate on larger data. (It may not be the size of the data that's to blame, as I have not run it enough times to be sure - it takes a while).

def files: Source[File, NotUsed] =
  Source.fromIterator(
    () =>
      Files
        .fileTraverser()
        .breadthFirst(inDir)
        .asScala
        .filter(_.getName.endsWith(".gz"))
        .toIterator)

def extract =
  Flow[File]
    .mapConcat[String](unzip)
    .mapConcat(s =>
      (JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
    .groupBy(1 << 16, _._1)
    .groupedWithin(1000, 1.second)
    .map { lines =>
      val w = writer(lines.head._1)
      w.println(lines.map(_._2).mkString("\n"))
      w.close()
      Done
    }
    .mergeSubstreams

def unzip(f: File) = {
  scala.io.Source
    .fromInputStream(new GZIPInputStream(new FileInputStream(f)))
    .getLines
    .toIterable
    .to[collection.immutable.Iterable]
}

def writer(tk: String): PrintWriter =
  new PrintWriter(
    new OutputStreamWriter(
      new GZIPOutputStream(
        new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
      ))
  )

val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()

Await.result(process, Duration.Inf)

The thread dump shows that the process is WAITING at Await.result(process, Duration.Inf) and nothing else is happening.

OpenJDK v11 with Akka v2.5.15


Solution

  • Most likely it's stuck in groupBy because it ran out of available threads in dispatcher to gather items into 2^16 groups for all sources.

    So if I were you I'd probably implement grouping in extract semi-manually using statefulMapConcat with mutable Map[KeyType, List[String]]. Or buffer lines with groupedWithin first and split them into groups that you would write to different files in Sink.foreach.