Search code examples
scalacsvakkaakka-streamthrottling

Akka stream not processing all the lines from my CSV file


I am using Akka stream to process a CSV file containing 1839 lines. I have added counters to count the number of lines processed.

Here is my source, I have made sure that each line in the input file is less than 700 chars.

case class ParsedLine(input: String, field1: String, field2: String, field3: String)


val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()

val lineSource = FileIO
    .fromPath(Paths.get(InputFile))
    .via(Framing.delimiter(ByteString("\n"), 1024, allowTruncation = true))
    .map { l =>
      counter0.incrementAndGet()
      l.utf8String
    }

 val parseLine = Flow[String].map { l =>
     val words = l.split(",")
     ParsedLine(l, words(0), words(1), words(2))
 }

This source is processed as follows, corresponding to every line in the source there should be a processed line in the output.

val done = lineSource
    .via(parseLine)
    .to(Sink.foreach(_.input))
    .run()


  done.onComplete {
    case Success(_) =>
      println("Counter0 " + counter0.get())
      println("Counter1 " + counter1.get())
      system.terminate()
    case Failure(e) =>
      println(e.getLocalizedMessage)
      system.terminate()
  }

The interesting thing is the counters are printing as follows & the each time I am getting different numbers. If I remove the .to(Sink.foreach(_.input)) line I am getting the counts as 1839.

Counter0 1445
Counter1 1667

First of all I expect Counter0 to have a higher value than Counter1 because Counter0 comes in a stage before Counter1, I expect all the lines to be processed and counters should have printed the total number of lines 1839.

Any idea what is going on in this case ? Is akka stream dropping items in between ?


Solution

  • You are actually not waiting for the stream to finish.

    You are attatching the Sink.foreach(...) using to which drops the processing details of Sink.foreach stage and keeps only the processing stage of the earlier stage.

    Also, keep in mind that you are doing the same at every step (via, map, via and then to). So, you are only keeping track of the processing stage of first graph step which was created by FileIO.from(...). This means that you are only waiting to read the full file but not for any of the subsequent processing steps.

    You just need to preserve the results of both and wait for both of them to finish.

    val stream =
     lineSource
       .via(parseLine)
       .toMat(Sink.foreach(_.input))(Keep.both)
    
    val resultFutures: (Future[IOResult], Future[Done]) = stream.run()
    
    val resultsFuture = Future.sequence(List(resultFutures._1, resultFutures._2))
    
    resultsFuture.onComplete {
      case Success(List(ioResult, done)) =>
        println(ioResult)
        println(done)
        println(counter0.get())
        actorSystem.terminate()
      case Failure(e) =>
        println(e.getLocalizedMessage)
        actorSystem.terminate()
    }
    

    Or, you can choose to keep track of just the last processing stage (which is Sink.foreach(...) in this case)

    val stream =
      lineSource
        .via(parseLine)
        .toMat(Sink.foreach(_.input))(Keep.right)
    
    val resuleFuture: Future[Done] = stream.run()
    
    resuleFuture.onComplete({
      case Success(_) =>
        println("Counter0 " + counter0.get())
        actorSystem.terminate()
      case Failure(e) =>
        println(e.getLocalizedMessage)
        actorSystem.terminate()
    })