Search code examples
scalaakka-stream

Maintaining state within a stream


I have a heavy load flow of users data. I want to determine if this is a new user by it's id. In order to reduce calls to the db I rather maintain a state in memory of previous users.

val users = mutable.set[String]()
//init the state from db
user = db.getAllUsersIds()
val source: Source[User, NotUsed]
val dbSink: Sink[User, NotUsed] //goes to db
//if the user is added to the set it will return true
val usersFilter = Flow[User].filter(user => users.add(user.id))

now I can create a graph

source ~> usersFilter ~> dbSink

my problem is that the mutable state is shared and unsafe. Is there an option to maintain the state within the flow ?


Solution

  • There are two ways of doing this.

    If you are getting a streams of records and you want to deduplicate the stream (because some ids are already processed). You can do

    http://janschulte.com/2016/03/08/deduplicate-akka-stream/

    The other way of doing this is via database lookups where you check if the ID already exists.

    val alreadyExists : Flow[User, NotUsed] = {
      // build a cache of known ids
      val knownIdList = ... // query database and get list of IDs
      Flow[User].filterNot(user => knownIdList.contains(user.id))
    }