I want implement Diff transformation stream with such logic:
I have some Source[JsValue]
and some database with id->JsValue data. I want to get Diff from JsValue
in Source
and db, then pull generated Diff further and store JsValue
from source to db.
I think about Akka-persistence as store implementation, but I need only current state, so it can be any key-value db.
As I am new to akka-stream, I can't understand, what the best way is for implementing this idea.
Assuming your JsValue
objects have an "id" field, you could write a querying Flow
that would take in the original JsValue
and produce a tuple of the original and database version:
def dbQuery(id : String) : JsValue = ???
val queryFlow : Flow[JsValue, (JsValue,JsValue), _] =
Flow[JsValue] map { originalJs =>
originalJs -> dbQuery((originalJs \ "id").as[String])
}
These tuples could be passed into a diffing Flow
:
def diffJs(original : JsValue, dbVersion : JsValue) : JsValue = ???
val diffFlow : Flow[(JsValue, JsValue), JsValue, _] =
Flow[(JsValue, JsValue)] map diffJs.tupled
The final part you mentioned was a db that would persist the differences, this can represented with a Sink
:
val dbSink : Sink[JsValue, _] = ???
All of these components can then be combined to form the stream based on your source of values:
val jsSource : Source[JsValue, _] = ???
jsSource via queryFlow via diffFlow runWith dbSink
For an example of asynchronous db querying see this example demonstrating mapAsync
.