There are two tables TableA
and TableB
.
I need to copy some records from TableA
to TableB
. I use slick-3.0
and use the following way:
import akka.stream._
import akka.stream.scaladsl._
...
//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
db.stream(q.result).mapResult { r =>
val record: RecordA = someTransformation(r)
record
}
}.grouped(50) // grouping because I want to write records in batch mode
//}}
//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
//TODO how to write batch to TableB asynchronously?
val insertAction = TableB ++= batch // insert batch to table
val fInsert: Future[_] = db.run(insertAction)
Await.result(fInsert, ...) // #1 this works only with blocking
})
//}}
But I've faced with an issue - how to write batch to TableB
asynchronously (see TODO). Now the above code works with blocking to inner future only (see #1 comment). Is there a right way for implementing that task asynchronously?
use mapAsync
it expects a future to be returned and exposes the "unwrapped" result in the next stage.
source.mapAsync(4){batch: Seq[RecordA] =>
val insertAction = TableB ++= batch // insert batch to table
db.run(insertAction)
}).to(Sink.ignore).run