Search code examples
akkaakka-streamreactive-streams

Stream records from DataBase using Akka Stream


I have a system using Akka which currently handles incoming streaming data over message queues. When a record arrives then it is processed, mq is acked and record is passed on for further handling within the system.

Now I would like to add support for using DBs as input.
What would be a way to go for the input source to be able to handle DB (should stream in > 100M records at the pace that the receiver can handle - so I presume reactive/akka-streams?)?


Solution

  • Slick Library

    Slick streaming is how this is usually done.

    Extending the slick documentation a bit to include akka streams:

    //SELECT Name from Coffees
    val q = for (c <- coffees) yield c.name
    
    val action = q.result
    
    type Name = String
    
    val databasePublisher : DatabasePublisher[Name] = db stream action
    
    import akka.stream.scaladsl.Source
    
    val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
    

    Now akkaSourceFromSlick is like any other akka stream Source.

    "Old School" ResultSet

    It is also possible to use a plain ResultSet, without slick, as the "engine" for an akka stream. We will utilize the fact that a stream Source can be instantiated from an Iterator.

    First create the ResultSet using standard jdbc techniques:

    import java.sql._
    
    val resultSetGenerator : () => Try[ResultSet] = Try {
      val statement : Statement = ???
      statement executeQuery "SELECT Name from Coffees"
    }
    

    Of course all ResultSet instances have to move the cursor before the first row:

    val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
      (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
    

    Once we start iterating through rows we'll have to pull the value from the correct column:

    val getNameFromResultSet : ResultSet => Name = _ getString "Name"
    

    And now we can implement the Iterator Interface to create a Iterator[Name] from a ResultSet:

    val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
      (resultSet) => new Iterator[Try[Name]] {
        override def hasNext : Boolean  = resultSet.next
        override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
       } flatMap (_.toOption)
    

    And finally, glue all the pieces together to create the function we'll need to pass to Source.fromIterator:

    val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
      (_ : () => Try[ResultSet])
        .andThen(_ flatMap adjustResultSetBeforeFirst) 
        .andThen(_ map convertResultSetToNameIterator) 
        .andThen(_ getOrElse Iterator.empty)
    

    This Iterator can now feed a Source:

    val akkaSourceFromResultSet : Source[Name, _] = 
      Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
    

    This implementation is reactive all the way down to the database. Since the ResultSet pre-fetches a limited number of rows at a time, data will only come off the hard drive through the database as the stream Sink signals demand.