Search code examples
scalaconcurrencyatomicscala-catscats-effect

Ref Updates And Fiber Triggers Using Cats Effect


Problem: I am trying to solve a problem where I need to schedule for every x minutes, I need to update the cache and concurrent gets are possible.

Solutions tried:

  1. Using TrieMap and ScheduledThreadPool Executor With Cats Effects:

I actually started with using TrieMap as it provides thread safety and used scheduled thread pool for scheduling the update

import cats.Applicative.ops.toAllApplicativeOps
import cats.effect.concurrent.Ref
import cats.effect.{ExitCode, IO, IOApp}

import java.util.concurrent.{Executors, ScheduledExecutorService}
import scala.collection.concurrent.TrieMap
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreTrieMap extends IOApp {
  def callForEvery[A](f: => Unit, d: FiniteDuration)
                     (implicit sc: ScheduledExecutorService): IO[Unit] = {
    IO.cancelable {
      cb =>
        val r = new Runnable {
          override def run(): Unit = cb(Right(f))
        }
        val scFut = sc.scheduleAtFixedRate(r, 0, d.length, d.unit)
        IO(scFut.cancel(false)).void
    }
  }

  val map = TrieMap.empty[String, String]
  override def run(args: List[String]): IO[ExitCode] = {
    implicit val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)
    for {
      _ <- callForEvery(println(map.get("token")), 1 second)
      _ <- callForEvery(println(map.put("token", Random.nextString(10))), 3 second)
    } yield ExitCode.Success
  }
}

  1. Using Ref and Cats Effect Fibers:

And then created a pure cats-effect solution.

Will this below code end up in StackOverflow error?

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreCatFiber extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO, String]("")
      s <- scheduleAndPopulate(ref, 1 minute)
      r <- keepPollingUsingFiber(ref)
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
    (for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      rS <- scheduleAndPopulate(r, duration)(cs)
      _ <- rS.join
    } yield ()).start(cs)
  }


  def keepPollingUsingFiber(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Fiber[IO, Unit]] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingFiber(r)(cs)
      _ <- w.join
    } yield ()).start(cs)
  }
}

I'm trying to update a Ref, and use the Ref like a concurrent cache that is being updated by another fiber. And I'm triggering the fiber creation using recursion. I know fibers can be used for stacksafe operations. In this case, I'm joining on the old fiber created. So wanted to understand is the below code safe.

Update (Solution from answers provided below)

Third solution: Based on input from one of the answers. Rather than forking for each recursive call, fork it on the caller.

import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCode, Fiber, IO, IOApp}

import scala.concurrent.Future
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

object ExploreCatFiberWithIO extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      ref <- Ref.of[IO, String]("")
      s <- scheduleAndPopulateWithIO(ref, 1 second).start
      r <- keepPollingUsingIO(ref).start
      _ <- s.join
      _ <- r.join
    } yield ExitCode.Success
  }

  def populate(): Future[String] = Future.successful(Random.nextString(10))

  val futPop = IO.fromFuture(IO(populate()))

  def scheduleAndPopulateWithIO(r: Ref[IO, String], duration: FiniteDuration)(implicit cs: ContextShift[IO]): IO[Unit] = {
    for {
      _ <- IO(println("Scheduled For Populating Ref"))
      res <- futPop
      _ <- r.set(res)
      _ <- IO.sleep(duration)
      _ <- scheduleAndPopulateWithIO(r, duration)(cs)
    } yield ()
  }

  def keepPollingUsingIO(r: Ref[IO, String])(implicit cs: ContextShift[IO]): IO[Unit] = {
    (for {
      res <- r.get
      _ <- IO(println(res))
      _ <- IO.sleep(1 second)
      w <- keepPollingUsingIO(r)(cs)
    } yield ())
  }
}

Would love to know the pros and cons of the approaches discussed above.


Solution

  • For the second approach, you can make it simpler by not forking a Fiber in scheduleAndPopulate and keepPollingUsingFiber. Instead, keep the recursive call, and fork them in the caller. IO is stack-safe, so the recursive call won't blow up the stack.

    You could use start to fork each, but it might be simpler to parTupled them. It's a variation of parMapN that forks each effect and gathers their results.

    (Also, in your code you don't need to pass the implicit values, like cs, explicitly, the compiler will infer them for you.)

    object ExploreCatFiber extends IOApp {
      override def run(args: List[String]): IO[ExitCode] = {
        for {
          ref <- Ref.of[IO, String]("")
          _ <- (scheduleAndPopulate(ref, 1 minute), keepPollingUsingFiber(ref)).parTupled
        } yield ExitCode.Success
      }
    
      def populate(): Future[String] = Future.successful(Random.nextString(10))
    
      val futPop = IO.fromFuture(IO(populate()))
    
      def scheduleAndPopulate(r: Ref[IO, String], duration: FiniteDuration): IO[Unit] = {
        (for {
          _ <- IO(println("Scheduled For Populating Ref"))
          res <- futPop
          _ <- r.set(res)
          _ <- IO.sleep(duration)
          _ <- scheduleAndPopulate(r, duration)
        } yield ()
      }
    
      def keepPollingUsingFiber(r: Ref[IO, String]): IO[Unit] = {
        (for {
          res <- r.get
          _ <- IO(println(res))
          _ <- IO.sleep(1 second)
          _ <- keepPollingUsingFiber(r)
        } yield ()
      }
    }