Search code examples
scalacats-effect

Understanding cats.effect.Concurrent with respect to Cancellation


Given:

build.sbt

scalaVersion := "2.13.2"
libraryDependencies += "org.typelevel" %% "cats-effect" % "2.1.3"

src/main/scala/net/Main.scala

package net

import cats.effect._
import cats.implicits._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

object App extends IOApp { self: IOApp =>
  override def run(args: List[String]): IO[ExitCode] =
    for {
      _ <- uncancellable
      _ <- notUncancellable
    } yield ExitCode.Success

  private def uncancellable: IO[Unit] = {
    val tick: IO[Unit] = Concurrent[IO].uncancelable(self.timer.sleep(10.seconds))

    for {
      _ <- IO(println("uncancellable"))
      fiber <- Concurrent[IO].start(tick)
      _ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
      _ <- fiber.cancel
      _ <- fiber.join
      _ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
    } yield ()
  }

  private def notUncancellable: IO[Unit] = {
    val tick: IO[Unit] = self.timer.sleep(10.seconds)

    for {
      _ <- IO(println("notUncancellable"))
      fiber <- Concurrent[IO].start(tick)
      _ <- IO(println("seconds begin: " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
      _ <- fiber.cancel
      _ <- fiber.join
      _ <- IO(println("seconds done : " + FiniteDuration.apply(System.nanoTime(), TimeUnit.NANOSECONDS).toSeconds))
    } yield ()
  }
}

Running it shows the following output:

sbt:cats-effect-cancellation-question> run
[info] Compiling 1 Scala source to /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/classes ...
[info] Done compiling.
[info] Packaging /Users/kevinmeredith/Workspace/cats-effect-cancellation-questions/target/scala-2.13/cats-effect-cancellation-question_2.13-0.1.jar ...
[info] Done packaging.
[info] Running net.App 
uncancellable
seconds begin: 303045
seconds done : 303055
notUncancellable
seconds begin: 303055
^C$

Note that, after ~30 seconds, I cancelled it.

Why did "seconds done : not print out for:

notUncancellable
seconds begin: 303055
^C$

?


Solution

  • uncancellable is self explanatory I believe.

    In non-canclellable you have a situation like in this GitHub issue.

    As Alexandru Nedelcu says:

    fiber.cancel makes fiber.join to be non-terminating in the case of IO. Therefore fiber.join will never complete and that guarantee never gets a chance to be evaluated.

    You can force an evaluation if you cancel that too, which in a real app you'd need to do if you cared for the result of that fiber.join.

    As far as I can tell this is a possible interpretation of the contract

      /**
       * Returns a new task that will await for the completion of the
       * underlying fiber, (asynchronously) blocking the current run-loop
       * until that result is available.
       */
      def join: F[A]
    

    Cancelled fiber cannot return successful value - that's obvious. But if it returned exception of another failure... it would also be returned a value, which could be considered value computed by fiber - which should not return any value, because it was cancelled!

    For that reason in this situation, your whole thread awaits for a value that never arrived.

    In order to avoid such pitfalls you could use something less "low level" like racePair or similar which would avoid dealing with such issues yourself. You can read a short post by Oleg Pyzhcov about fiber safety.