Search code examples
scalastreamakkaakka-streamstateful

Akka Streams: State in a flow


I want to read multiple big files using Akka Streams to process each line. Imagine that each key consists of an (identifier -> value). If a new identifier is found, I want to save it and its value in the database; otherwise, if the identifier has already been found while processing the stream of lines, I want to save only the value. For that, I think that I need some kind of recursive stateful flow in order to keep the identifiers that have already been found in a Map. I think I'd receive in this flow a pair of (newLine, contextWithIdentifiers).

I've just started to look into Akka Streams. I guess I can manage myself to do the stateless processing stuff but I have no clue about how to keep the contextWithIdentifiers. I'd appreciate any pointers to the right direction.


Solution

  • Maybe something like statefulMapConcat can help you:

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Sink, Source}
    import scala.util.Random._
    import scala.math.abs
    import scala.concurrent.ExecutionContext.Implicits.global
    
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    
    //encapsulating your input
    case class IdentValue(id: Int, value: String)
    //some random generated input
    val identValues = List.fill(20)(IdentValue(abs(nextInt()) % 5, "valueHere"))
    
    val stateFlow = Flow[IdentValue].statefulMapConcat{ () =>
      //state with already processed ids
      var ids = Set.empty[Int]
      identValue => if (ids.contains(identValue.id)) {
        //save value to DB
        println(identValue.value)
        List(identValue)
      } else {
        //save both to database
        println(identValue)
        ids = ids + identValue.id
        List(identValue)
      }
    }
    
    Source(identValues)
      .via(stateFlow)
      .runWith(Sink.seq)
      .onSuccess { case identValue => println(identValue) }