Search code examples
scalazio

How can I recur a Zio schedule until a particular value is produced?


Suppose I have an IO (therefore unsafe) operation that will either return true or false. I want to use the Zio scheduling mechanism to execute this until the value is true, but only up to a maximum of N times. To adopt the code from the documentation and change it to what I'm trying to achieve...

import zio._
import zio.duration._
import zio.console._
import zio.clock._
import java.util.Random

object API {
  // our API method will return true about 30% of the time, but
  // return false the rest of the time (instead of throwing an
  // exception, as is shown in documentation)
  def makeRequest: Task[Boolean] = Task.effect {
    new Random().nextInt(10) > 7
  }
}

object ScheduleUtil {
  def schedule[A] = Schedule.spaced(1.second) && Schedule.recurs(4).onDecision({
    case Decision.Done(_)                 => putStrLn(s"done trying")
    case Decision.Continue(attempt, _, _) => putStrLn(s"attempt #$attempt")
  })
}

import ScheduleUtil._
import API._

object ScheduleApp extends scala.App {

  implicit val rt: Runtime[Clock with Console] = Runtime.default

  rt.unsafeRun(makeRequest.retry(schedule).foldM(
    ex => putStrLn("Exception Failed"),
    v => putStrLn(s"Succeeded with $v"))
  )
}

// run the app
ScheduleApp.main(Array())

This doesn't work, of course. The output is either Succeeded with false or (occasionally) Succeeded with true. I tried adding Schedule.recurUntilEquals to the Schedule definition, but to no avail.

object ScheduleUtil {
    def schedule[A] = Schedule.spaced(1.second) && Schedule.recurUntilEquals(true) && Schedule.recurs(4).onDecision({
      case Decision.Done(_)                 => putStrLn(s"done trying")
      case Decision.Continue(attempt, _, _) => putStrLn(s"attempt #$attempt")
    })
  }

import ScheduleUtil._

// re-define ScheduleApp in the exact same way as above, and the following error results:

cmd93.sc:5: polymorphic expression cannot be instantiated to expected type;
 found   : [A]zio.Schedule[zio.console.Console,Boolean,((Long, Boolean), Long)]
    (which expands to)  [A]zio.Schedule[zio.Has[zio.console.Console.Service],Boolean,((Long, Boolean), Long)]
 required: zio.Schedule[?,Throwable,?]
  rt.unsafeRun(makeRequest.retry(schedule).foldM(

How can I accomplish such a use case using the Zio scheduler? Of course, I can redefine the makeRequest task to deliberately throw an exception, instead of returning false, and this works just as in the documentation. But I was hoping to avoid unnecessary exception generation/handling.

object API {
    // our API method will return true about 30% of the time, but
    // return false the rest of the time (instead of throwing an
    // exception, as is shown in documentation)
    def makeRequest = Task.effect {
      if (new Random().nextInt(10) > 7) true else throw new Exception("Not true")
    }
  }

Solution

  • Your issue is that the you are using retry on the effect instead of repeat which is what you want since you will be explicitly bypassing the error channel as you mentioned.

    So simply change makeRequest.retry(schedule) to makeRequest.repeat(schedule) and it should work.

    For a more detailed description consider the signatures below:

    // Schedule.spaced
    def spaced(duration: Duration): Schedule[Any, Any, Long]
    
    // Schedule.recurs
    def recurs(n: Int): Schedule[Any, Any, Long]
    
    // Schedule.recurUntilEquals
    def recurUntilEquals[A](a: => A): Schedule[Any, A, A]
    

    Schedule has three type parameters, -Env, -In, +Out, the Env is the same as the standard R type that is part of ZIO, but In and Out are different from the standard E and A on the other ZIO types. This is because the Schedule describes "recurring schedule, which consumes values of type In, and which returns values of type Out" according to the docs. For spaced and recurs the input is Any indicating that it will accept any input value and by extension also doesn't constrain the value. You can see this by composing the two together:

    val s: Schedule[Any, Any, (Long, Long)] = Schedule.spaced(1.second) && Schedule.recurs(1)
    

    This is also why it doesn't cause any compiler errors when used as part of retry, because they don't have any specific requirements on the error channel when they don't make use of it. But this also hides your problem because retry only uses the schedule if there is an error but since you were returning true or false you didn't end up receiving an error and your schedule was never invoked.

    Once you added the recurUntilEquals an input constraint is added to the schedule:

    val s: Schedule[Any, Boolean, ((Long, Long), Boolean)] = Schedule.spaced(1.second) && Schedule.recurs(1) && Schedule.recurUntilEquals(true)
    

    Now you are saying that the input that should be fed into the Schedule is actually a boolean, but retry has the signature:

    def retry[R1 <: R, S](policy: Schedule[R1, E, S])(implicit ev: CanFail[E]): ZIO[R1 with Clock, E, A]
    

    Notice that the second position of the Schedule in the policy argument is the E parameter which is the error type and since Throwable =!= Boolean you get a compiler error as a result.

    Correspondingly this is the signature for repeat

    def repeat[R1 <: R, B](schedule: Schedule[R1, A, B]): ZIO[R1 with Clock, E, B]
    

    Here we see that the schedule actually takes the A type which in this case would be the response from your API or Boolean which matches with what you are expecting in your provided schedule.