Search code examples
scalaactorfuturetask

Scala - Return value from Callbacks


I am fairly new to scala programming. Can someone please help me with return value from callbacks. How could I return a callback value as JsObject from the calling method? I am using the Play2 framework with an actor system. Please let me know if my return type is wrong and I should return Future as compared to JsObject in SendToKafka method.

I have the following code

override def SendToKafka(data: JsValue): Option[JsObject] = {
  val props: Map[String, AnyRef] = Map(
    "bootstrap.servers" -> "localhost:9092",
    "group.id" -> "CountryCounter",
    "key.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
    "schema.registry.url" -> "http://localhost:8081"
  )

  val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/test.avsc")).mkString)

  val gRecord: GenericRecord = new GenericData.Record(schema)
  gRecord.put("emp_id", request.emp_id)

  val producer = new KafkaProducer[Int, GenericRecord](props.asJava)
  val record = new ProducerRecord("Emp", 1, gRecord)

  val promise = Promise[RecordMetadata]()

  producer.send(record, producerCallback(promise))
  val f = promise.future
  val returnValue : Some[JsObject] =null
  val con = Future {
    f onComplete {
      case Success(r) => accessLogger.info("r" + r.offset())
      case Failure(e) => accessLogger.info("e "+ e)
    }

    // I would like to return offset as JsObject or exception ( if any )
  }

  private def producerCallback(promise: Promise[RecordMetadata]): Callback = {
    new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {

      val result = if (exception == null) {
        //accessLogger.info("offset - " + metadata.offset())
        // I would like to return this offset as JsObject 
        Success(metadata)
      }
      else {
        accessLogger.error(exception.printStackTrace().toString)
        Failure(exception)
        // I would like to return exception (if any ) as JsObject 
      }
      promise.complete(result)
    }
  }
}

Solution

  • Because promise is of type Promise[RecordMetadata] and f is promise.future, f is of type Future[RecordMetadata]. The future will hold whatever result is, in promise.complete(result).

    The future can end up containing a failure (i.e. Failure(exception) in your callback) so this needs to be handled (below it is accounted for using a match/case)

    Await.ready can be used to wait until the future has either a Success or Failure -- but without such a blocking call, inside the same method, the future probably won't be completed yet.

    import scala.concurrent.duration._
    import scala.concurrent._
    ...
    // arbitrary time -- set an appropriate wait time
    val fReady: Future[RecordMetadata] = Await.ready(f, 4.seconds)
    
    // After Await.ready is called, *up to* the duration (4s here) has elapsed and the future should have a result
    // you probably need to change the return type to Either if you use this approach,
    // or change this to Option type and ignore the failure, assuming that the exception is logged already 
    val result: Either[Throwable, Int] = fReady.value match {
      case Some(Success(a)) => Right(a) // you can edit this to compute a JsValue from `a` if you want
      case Some(Failure(b)) => Left(b)
      case None => Left(new RuntimeException("Unexpected"))
    }
    
    // can be return type or edit this
    result