Search code examples
scalaconcurrencyfunctional-programmingscala-cats

Effectful update of Ref/MVar


I'd like to apply effectual computation to the value inside MVar or Ref and atomically update it in case the computation succeeds or put back the initial value (in case of MVar)/simply do nothing(in case of Ref) in case the operation fails.

I. Ref-case

val ref = Ref.of[IO, Int](0)

def foo(i: Int): IO[Int] = //... some effectual computation

Since atomicity matters and Ref does not provide compareAndSet operation unfortunately so it must be implemnted explicitly which does not look attracting.

II. MVar-case

MVar provides mutually-exclusion semantic, but the problem is that bracket does not allow us to put the computed value. Here is an example:

val mvar = MVar.of[IO, Int](0)

def foo(i: Int): IO[Int] = IO(i + 1)

for {
  mvar <- mvar
  i <- mvar.take.bracket(foo)(mvar.put) //puts back 0, not 1
} yield ()

Is there a way to implement such behavior at least for either MVar or Ref?

UPD:

I implemented it with MVar, but it looks rather ugly:

def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] = for {
  i <- mvar.take
  ii <- foo(i).onError{
    case t => mvar.put(i)
  }
  _ <- mvar.put(ii)
} yield ii

Solution

  • You can use MonadError.redeemWith for this:

    def updateAtomically(mvar: MVar[IO, Int], foo: Int => IO[Int]): IO[Int] =
      for {
        i  <- mvar.take
        ii <- foo(0).redeemWith(_ => IO(i), ii => mvar.put(ii) *> IO(ii))
      } yield ii
    

    And then:

    import cats.Applicative.ops.toAllApplicativeOps
    import cats.effect.{ ExitCode, IO, IOApp }
    import cats.effect.concurrent.MVar
    
    object Foo extends IOApp {
    
      def foo(i: Int): IO[Int] = IO(i + 1)
      def fooBar(i: Int): IO[Int] = IO.raiseError(new RuntimeException("BOOM"))
    
      def run(args: List[String]): IO[ExitCode] =
        (for {
          mvar <- MVar.of[IO, Int](0)
          res  <- updateAtomically(mvar, foo)
          _    <- IO(println(res))
        } yield res).map(_ => ExitCode.Success)
    }
    

    Yields:

    1
    

    And:

    def run(args: List[String]): IO[ExitCode] =
      (for {
         mvar <- MVar.of[IO, Int](0)
         res  <- updateAtomically(mvar, fooBar)
         _    <- IO(println(res))
       } yield res).map(_ => ExitCode.Success)
    

    Yields:

    0