Search code examples
scalaconcurrencyscala-catsside-effects

MVar tryPut returns true and isEmpty also returns true


I wrote simple callback(handler) function which i pass to async api and i want to wait for result:

object Handlers {

  val logger: Logger = Logger("Handlers")
  implicit val cs: ContextShift[IO] =
  IO.contextShift(ExecutionContext.Implicits.global)

  class DefaultHandler[A] {

  val response: IO[MVar[IO, A]] = MVar.empty[IO, A]

  def onResult(obj: Any): Unit = {
    obj match {
      case obj: A =>
        println(response.flatMap(_.tryPut(obj)).unsafeRunSync())
        println(response.flatMap(_.isEmpty).unsafeRunSync())
      case _ => logger.error("Wrong expected type")
    }
  }

  def getResponse: A = {
    response.flatMap(_.take).unsafeRunSync()
  }
}

But for some reason both tryPut and isEmpty(when i'd manually call onResult method) returns true, therefore when i calling getResponse it sleeps forever. This is the my test:

class HandlersTest extends FunSuite {
    test("DefaultHandler.test") {
    val handler = new DefaultHandler[Int]
    handler.onResult(3)
    val response = handler.getResponse
    assert(response != 0)
    }
  }

Can somebody explain why tryPut returns true, but nothing puts. And what is the right way to use Mvar/channels in scala?


Solution

  • IO[X] means that you have the recipe to create some X. So on your example, yuo are putting in one MVar and then asking in another.

    Here is how I would do it.

    object Handlers {
      trait DefaultHandler[A] {
        def onResult(obj: Any): IO[Unit]
    
        def getResponse: IO[A]
      }
    
      object DefaultHandler {
        def apply[A : ClassTag]: IO[DefaultHandler[A]] = 
          MVar.empty[IO, A].map { response =>
            new DefaultHandler[A] {
              override def onResult(obj: Any): IO[Unit] = obj match {
                  case obj: A =>
                    for {
                      r1 <- response.tryPut(obj)
                      _  <- IO(println(r1))
                      r2 <- response.isEmpty
                      _  <- IO(println(r2))
                    } yield ()
    
                  case _ =>
                    IO(logger.error("Wrong expected type"))
                }
    
              override def getResponse: IO[A] =
                response.take
            }
          }
      }
    }