Search code examples
javascalaquartz-scheduler

How to schedule scala Future from persist quartz job


Suppose, there is some scala code which should be scheduled with the help of java quartz library. And we need to store the result of this code execution in the job context in order to have access to this result in the next job execution. For synthetic example, there is some CounterService which has an inc function which should be scheduled:

trait CounterService {
  def inc(): Int
}

The following quartz-job invokes inc and stores its result in the JobDataMap successfully:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: CounterService = ...

  override def execute(context: JobExecutionContext): Unit = {

    val newCounterValue: Int = counterService.inc()

    val map = context.getJobDetail.getJobDataMap
    map.put("counter", newCounterValue)  
  }
}

We can get job result any time in the other place (if we have reference to the scheduler):

val scheduler: Scheduler = ...
// gets details of our CounterJob which was created and registered in the scheduler
// by the name "counter-job" (it is not shown in our example)
val job = scheduler.getJobDetail(JobKey.jobKey("counter-job")) 
// this map will contain the job result which was stored by the key "counter"
val map = job.getJobDataMap.asScala 

But this method does not work if we want to execute async code from the quartz-job. For example, suppose our counter service is the following:

trait AsyncCounterService {
  def asyncInc(): Future[Int]
}

We can try to implement our job in the following way. But it does not work correctly, because the method CounterJob.execute can be executed earlier than the asyncCounterService.asyncInc. And we can't store result of asyncInc in the JobDataMap:

@PersistJobDataAfterExecution
@DisallowConcurrentExecution
class CounterJob extends Job {
  val counterService: AsyncCounterService = ...  
  val execContext: ExecutionContext = ...

  override def execute(context: JobExecutionContext): Unit = {

    // # 1: we can not influence on the execution flow of this future 
    //      from job scheduler.
    val counterFuture: Future[Int] = counterService.asyncInc() 

    counterFuture.map { counterValue: Int =>

      val map = context.getJobDetail.getJobDataMap  
      // #2: this action won't have any effect
      map.put("counter", counterValue)              
    }
  }
}

Is at least two problems of this solutions which are marked in the above code as #1 ... and #2 ... comments.

Are there any better practices for solving this issue? In other words, how to schedule scala Future from persist quartz job with the storing Future's results in the JobDetailData map?


Solution

  • If everything after the CounterJob needs to have the counterService value, then it is ok to just block and await the Future within the CounterJob. Nothing can be executed in that time anyway, because the value was not computed yet.

    import scala.concurrent.{Await,Future}
    ...
    
     try {
          val counterValue  = Await.result(counterFuture, 5.seconds)
          map.put("counter", counterValue)       
        } catch {
          case t: TimeoutException => ...
          case t: Exception => ...
       }
    
    

    If you have multiple async futures in that job, you can combine them either with a monadic chain of flatMap, map operations a for comprehension or with static helper methods from Future companion object e.g Future.sequence Then endresult will be one future combining all async operations, which you can wait on with Await.

    Normally it is considered a bad practice to await on futures. Because this blocks the executor thread from doing any other operation while waiting for the future to complete.

    However here, you are mixing in another job scheduling framework with another concurrency paradigm. As stated above, in the particular example it is fine to block, as everything later, relies on the first computation.

    If other jobs could run at the same time, there would be multiple ways of solving this:

    1. There is a way to return the future from the job. Then you can wait for this future to complete, before scheduling dependent jobs.
    2. There is some sort of custom event listener mechanism from the job, which can be triggered from the job. counterFuture.map {context.notify("computationReady")}
    3. There is specific AsyncJob supporting non-blocking io which expects a java Future as return. Then you can convert the Scala Future to a Java Future