Search code examples
mysqlslickakka-streamslick-3.0reactive-streams

How are reactive streams used in Slick for inserting data


In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:

val source = Source(0 to 100)

how can I crete a Sink with Slick that writes those values into a table with schema:

create table NumberTable (value INT)


Solution

  • Serial Inserts

    The easiest way would be to do inserts within a Sink.foreach.

    Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

    //Tables file was auto-generated by the schema code generation
    import Tables.{Numbertable, NumbertableRow} 
    
    val numberTableDB = Database forConfig "NumberTableConfig"
    

    We can write a function that does the insertion

    def insertIntoDb(num : Int) = 
      numberTableDB run (Numbertable += NumbertableRow(num))
    

    And that function can be placed in the Sink

    val insertSink = Sink[Int] foreach insertIntoDb
    
    Source(0 to 100) runWith insertSink
    

    Batched Inserts

    You could further extend the Sink methodology by batching N inserts at a time:

    def batchInsertIntoDb(nums : Seq[Int]) = 
      numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
    
    val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
    

    This batched Sink can be fed by a Flow which does the batch grouping:

    val batchSize = 10
    
    Source(0 to 100).via(Flow[Int].grouped(batchSize))
                    .runWith(batchInsertSink)