Search code examples
scalaakkaakka-stream

Async call in akka stream flow


I want implement Diff transformation stream with such logic:

I have some Source[JsValue] and some database with id->JsValue data. I want to get Diff from JsValue in Source and db, then pull generated Diff further and store JsValue from source to db.

I think about Akka-persistence as store implementation, but I need only current state, so it can be any key-value db.

As I am new to akka-stream, I can't understand, what the best way is for implementing this idea.


Solution

  • Assuming your JsValue objects have an "id" field, you could write a querying Flow that would take in the original JsValue and produce a tuple of the original and database version:

    def dbQuery(id : String) : JsValue = ???
    
    val queryFlow : Flow[JsValue, (JsValue,JsValue), _] = 
      Flow[JsValue] map { originalJs =>
        originalJs -> dbQuery((originalJs \ "id").as[String])
      }
    

    These tuples could be passed into a diffing Flow:

    def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???
    
    val diffFlow : Flow[(JsValue, JsValue), JsValue, _] = 
      Flow[(JsValue, JsValue)] map diffJs.tupled
    

    The final part you mentioned was a db that would persist the differences, this can represented with a Sink:

    val dbSink : Sink[JsValue, _] = ???
    

    All of these components can then be combined to form the stream based on your source of values:

    val jsSource : Source[JsValue, _] = ???
    
    jsSource via queryFlow via diffFlow runWith dbSink
    

    For an example of asynchronous db querying see this example demonstrating mapAsync.