Search code examples
postgresqlscalaplayframeworkslickslick-3.0

Listen to PostgreSQL NOTIFY events with Slick


Can I use Slick / Play Framework (Scala) to listen to PostgreSQL NOTIFY statements?

I want to do something similar to this:

http://bjorngylling.com/2011-04-13/postgres-listen-notify-with-node-js.html


Solution

  • I don't think Slick supports PostgreSQL's NOTIFY, but the postgresql-async library does. One can use the latter to create an Akka Streams Source and incorporate it in a Play endpoint that streams the database notifications to a client:

    package controllers
    
    import javax.inject.{Inject, Singleton}
    
    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    
    import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
    import com.github.mauricio.async.db.postgresql.util.URLParser
    import com.github.mauricio.async.db.util.ExecutorServiceUtils.CachedExecutionContext
    
    import play.api.Logger
    import play.api.http.ContentTypes
    import play.api.libs.EventSource
    import play.api.mvc._
    
    import scala.concurrent.duration._
    import scala.concurrent.Await
    
    @Singleton
    class DbNotificationController @Inject()(cc: ControllerComponents,
                                             materializer: Materializer)
      extends AbstractController(cc) {
    
      implicit val mat = materializer
    
      val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5 seconds)
    
      val (actor, dbSource) =
        Source.actorRef[String](Int.MaxValue, OverflowStrategy.dropNew)
              .toMat(BroadcastHub.sink[String])(Keep.both)
              .run()
    
      connection.sendQuery("LISTEN my_channel")
      connection.registerNotifyListener { message =>
        val msg = message.payload
        Logger.debug(s"Sending the payload: $msg")
        actor ! msg
      }
    
      def index() = Action {
        Ok(views.html.scaladbnotification())
      }
    
      def streamDb() = Action {
        Ok.chunked(dbSource via EventSource.flow).as(ContentTypes.EVENT_STREAM)
      }
    }
    

    In the above controller, when the listener receives a notification from the database, the payload (a String) in the notification is logged and sent to an actor. The messages that are sent to this actor feed the Source that is used in the streamDb endpoint. Before the payload messages are sent to the client, they're converted to Play's EventSource class.


    I adapted DbNotificationController from the Play streaming example application, which you can use to experiment. If you would like to do so, obviously you need to integrate DbNotificationController into that project:

    1. Add "com.github.mauricio" %% "postgresql-async" % "0.2.21" to build.sbt.
    2. Set up PostgreSQL as needed, including the NOTIFY, and adjust the database URL in the controller according to your configuration.
    3. Copy and paste DbNotificationController into /app/controllers/.
    4. Copy the following file (call it scaladbnotification.scala.html) into app/views/:
    @main {
    
        <h1>Server Sent Event from DB</h1>
    
        <h1 id="db"></h1>
    
        <p>
            DB events are pushed from the Server using a Server Sent Event connection.
        </p>
    
        <script type="text/javascript" charset="utf-8">
            if (!!window.EventSource) {
                var stringSource = new EventSource("@routes.DbNotificationController.streamDb()");
                stringSource.addEventListener('message', function(e) {
                    $('#db').html(e.data.replace(/(\d)/g, '<span>$1</span>'))
                });
            } else {
                $("#db").html("Sorry. This browser doesn't seem to support Server sent event. Check <a href='http://html5test.com/compare/feature/communication-eventSource.html'>html5test</a> for browser compatibility."); 
            }
        </script>    
    }
    

    1. In the /conf/routes file, add the following lines:
        GET /scala/dbNotification           controllers.DbNotificationController.index()
        GET /scala/dbNotification/liveDb    controllers.DbNotificationController.streamDb()
    
    1. Start the application with sbt run and navigate to the following URL in your browser:

      http://localhost:9000/scala/dbNotification