Search code examples
scalamonix

Monix Task.sleep and single thread execution


I am trying to comprehend Task scheduling principles in Monix. The following code (source: https://slides.com/avasil/fp-concurrency-scalamatsuri2019#/4/3) produces only '1's, as expected.

  val s1: Scheduler = Scheduler(
    ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()),
    ExecutionModel.SynchronousExecution)

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >> repeat(id)

  val prog: Task[(Unit, Unit)] = (repeat(1), repeat(2)).parTupled

  prog.runToFuture(s1)

  // Output:
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // 1 pool-1-thread-1
  // ...

When we add Task.sleep to the repeat method

  def repeat(id: Int): Task[Unit] =
    Task(println(s"$id ${Thread.currentThread().getName}")) >>
      Task.sleep(1.millis) >> repeat(id)

the output changes to

// Output
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// 1 pool-1-thread-1
// 2 pool-1-thread-1
// ...

Both tasks are now executed concurently on a single thread! Nice :) Some cooperative yielding has kicked in. What happenend here exactly? Thanks :)

EDIT: same happens with Task.shift instead of Task.sleep.


Solution

  • I'm not sure if that's the answer you're looking for, but here it goes:

    Allthough naming suggests otherwise, Task.sleep cannot be compared to more conventional methods like Thread.sleep.

    Task.sleep does not actually run on a thread, but instead simply instructs the scheduler to run a callback after the elapsed time.

    Here's a little code snippet from monix/TaskSleep.scala for comparison:

    [...]
    
    implicit val s = ctx.scheduler
    val c = TaskConnectionRef()
    ctx.connection.push(c.cancel)
    
    c := ctx.scheduler.scheduleOnce(
      timespan.length,
      timespan.unit,
      new SleepRunnable(ctx, cb)
    )
    
    [...]
    
    private final class SleepRunnable(ctx: Context, cb: Callback[Throwable, Unit]) extends Runnable {
    
      def run(): Unit = {
        ctx.connection.pop()
        // We had an async boundary, as we must reset the frame
        ctx.frameRef.reset()
        cb.onSuccess(())
      }
    }
    
    [...]
    

    During the period before the callback (here: cb) is executed, your single-threaded scheduler (here: ctx.scheduler) can simply use his thread for whatever computation is queued next.

    This also explains why this approach is preferable, as we don't block threads during the sleep intervals - wasting less computation cycles.

    Hope this helps.