Search code examples
scalaasynchronousakkaconcurrent.futureslagom

How to save & return data from within Future callback


I've been facing an issue the past few days regarding saving & handling data from Futures in Scala. I'm new to the language and the concept of both. Lagom's documentation on Cassandra says to implement roughly 9 files of code and I want to ensure my database code works before spreading it out over that much code.

Specifically, I'm currently trying to implement a proof of concept to send data to/from the cassandra database that lagom implements for you. So far I'm able to send and retrieve data to/from the database, but I'm having trouble returning that data since this all runs asynchronously, and also returning that the data returned successfully.

I've been playing around for a while; The retrieval code looks like this:

override def getBucket(logicalBucket: String) = ServiceCall[NotUsed, String] {
request => {
  val returnList = ListBuffer[String]()

  println("Retrieving Bucket " + logicalBucket)
  val readingFromTable = "SELECT * FROM data_access_service_impl.s3buckets;"

  //DB query
  var rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
  println(rowsFuture)

  Await.result(rowsFuture, 10 seconds)

  rowsFuture onSuccess {
    case rows => {
      println(rows)
      for (row <- rows) println(row.getString("name"))
      for (row <- rows) returnList += row.getString("name")
      println("ReturnList: " + returnList.mkString)
    }
  }

  rowsFuture onFailure {
    case e => println("An error has occured: " + e.getMessage)
    Future {"An error has occured: " + e.getMessage}
  } 

  Future.successful("ReturnList: " + returnList.mkString)
 }      
}

When this runs, I get the expected database values to 'println' in the onSuccess callback. However, that same variable, which I use in the return statement, outside of the callback prints as empty (and returns empty data as well). This also happens in the 'insertion' function I use, where it doesn't always return variables I set within callback functions.

If I try to put the statement within the callback function, I'm given an error of 'returns Unit, expects Future[String]'. So I'm stuck where I can't return from within the callback functions, so I can't guarantee I'm returning data).

The goal for me is to return a string to the API so that it shows a list of all the s3 bucket names within the DB. That would mean iterating through the Future[Seq[Row]] datatype, and saving the data into a concatenated string. If somebody could help with that, they'll solve 2 weeks of problems I've had reading through Lagom, Akka, Datastax, and Cassandra documentation. I'm flabbergasted at this point (information overload) and there's no clearcut guide I've found on this.

For reference, here's the cassandraSession documentation:

LagomTutorial/Documentation Style Information with their only cassandra-query example CassandraSession.scala code


Solution

  • The key thing to understand about Future, (and Option, and Either, and Try) is that you do not (in general) get values out of them, you bring computations into them. The most common way to do that is with the map and flatMap methods.

    In your case, you want to take a Seq[Row] and transform it into a String. However, your Seq[Row] is wrapped in this opaque data structure called Future, so you can't just rows.mkString as you would if you actually had a Seq[Row]. So, instead of getting the value and performing computation on it, bring your computation rows.mkString to the data:

    //DB query
    val rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
    val rowToString = (row: Row) => row.getString("name")
    val computation = (rows: Seq[Row]) => rows.map(rowToString).mkString
    
    // Computation to the data, rather than the other way around
    val resultFuture = rowsFuture.map(computation)
    

    Now, when rowsFuture is completed, the new future that you created by calling rowsFuture.map will be fulfilled with the result of calling computation on the Seq[Row] that you actually care about.

    At that point you can just return resultFuture and everything will work as anticipated, because the code that calls getBucket is expecting a Future and will handle it as is appropriate.

    Why is Future opaque?

    The simple reason is because it represents a value that may not currently exist. You can only get the value when the value is there, but when you start your call it isn't there. Rather than have you poll some isComplete field yourself, the code lets you register computations (callbacks, like onSuccess and onFailure) or create new derived future values using map and flatMap.

    The deeper reason is because Future is a Monad and monads encompass computation, but do not have an operation to extract that computation out of them