Search code examples
scalahttp4s

http4s, Service Executor and Mongodb: How can I wait for insertOne to finish


Apologies in advance for the basic question. I am starting to learn Scala with http4s and in a router handler, I am trying to enter an entry to MongoDB. As far as I can tell insertOne returns a Observable[Completed].

Any idea how I can wait for the observalbe to complete, before returning the response?

My code is:

class Routes {
    val service: HttpService = HttpService {
        case r @ GET -> Root / "hello" => {
            val mongoClient: MongoClient = MongoClient()
            val database: MongoDatabase = mongoClient.getDatabase("scala")
            val collection: MongoCollection[Document] = database.getCollection("tests")
            val doc: Document = Document("_id" -> 0, "name" -> "MongoDB", "type" -> "database",
                                 "count" -> 1, "info" -> Document("x" -> 203, "y" -> 102))
            collection.insertOne(doc)
            mongoClient.close()
            Ok("Hello.")
        }
    }
}

class GomadApp(host: String, port: Int) {
  private val pool = Executors.newCachedThreadPool()

  println(s"Starting server on '$host:$port'")

  val routes = new Routes().service

  // Add some logging to the service
  val service: HttpService = routes.local { req =>
    val path = req.uri
    val start = System.nanoTime()
    val result = req
    val time = ((System.nanoTime() - start) / 1000) / 1000.0
    println(s"${req.remoteAddr.getOrElse("null")} -> ${req.method}: $path in $time ms")
    result
  }

  // Construct the blaze pipeline.
  def build(): ServerBuilder =
    BlazeBuilder
      .bindHttp(port, host)
      .mountService(service)
      .withServiceExecutor(pool)
}

object GomadApp extends ServerApp {
  val ip   = "127.0.0.1"
  val port = envOrNone("HTTP_PORT") map (_.toInt) getOrElse (8787)

  override def server(args: List[String]): Task[Server] =
    new GomadApp(ip, port)
      .build()
      .start

}

Solution

  • I'd recommend https://github.com/haghard/mongo-query-streams - although you'll have to fork it and up the dependencies a bit, scalaz 7.1 and 7.2 aren't binary-compatible.

    The less-streamy (and less referentially correct) way: https://github.com/Verizon/delorean

    collection.insertOne(doc).toFuture().toTask.flatMap({res => Ok("Hello")})
    

    The latter solution looks easier, but it has some hidden pitfalls. See https://www.reddit.com/r/scala/comments/3zofjl/why_is_future_totally_unusable/

    This tweet made me wonder: https://twitter.com/timperrett/status/684584581048233984 Do you consider Futures "totally unusable" or is this just hyperbole? I've never had a major problem, but I'm willing to be enlightened. Doesn't the following code make Futures effectively "lazy"? def myFuture = Future { 42 } And, finally, I've also heard rumblings that scalaz's Tasks have some failings as well, but I haven't found much on it. Anybody have more details?

    Answer:

    The fundamental problem is that constructing a Future with a side-effecting expression is itself a side-effect. You can only reason about Future for pure computations, which unfortunately is not how they are commonly used. Here is a demonstration of this operation breaking referential transparency:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.Random
    
    val f1 = { 
      val r = new Random(0L)
      val x = Future(r.nextInt)
      for { 
        a <- x
        b <- x
      } yield (a, b) 
    }
    
    // Same as f1, but I inlined `x`
    val f2 = { 
      val r = new Random(0L)
      for { 
        a <- Future(r.nextInt)
        b <- Future(r.nextInt)
      } yield (a, b) 
    }
    
    f1.onComplete(println) // Success((-1155484576,-1155484576))
    f2.onComplete(println) // Success((-1155484576,-723955400))    <-- not the same
    

    However this works fine with Task. Note that the interesting one is the non-inlined version, which manages to produce two distinct Int values. This is the important bit: Task has a constructor that captures side-effects as values, and Future does not.

    import scalaz.concurrent.Task
    
    val task1 = { 
      val r = new Random(0L)
      val x = Task.delay(r.nextInt)
      for { 
        a <- x
        b <- x 
      } yield (a, b) 
    }
    
    // Same as task1, but I inlined `x`
    val task2 = { 
      val r = new Random(0L)
      for { 
        a <- Task.delay(r.nextInt)
        b <- Task.delay(r.nextInt)
      } yield (a, b) 
    }
    
    println(task1.run) // (-1155484576,-723955400)
    println(task2.run) // (-1155484576,-723955400)
    

    Most of the commonly-cited differences like "a Task doesn't run until you ask it to" and "you can compose the same Task over and over" trace back to this fundamental distinction. So the reason it's "totally unusable" is that once you're used to programming with pure values and relying on equational reasoning to understand and manipulate programs it's hard to go back to side-effecty world where things are much harder to understand.