Search code examples
haskellconcurrencyconduit

Conduit Source depending on MVar


I am implementing a Conduit Source for some client that subscribes to a queue and puts all the arriving messages into an MVar.

The problem is that I cannot read from that MVar to yield those messages through the Conduit Source, as it reports an exception on runtime: thread blocked indefinitely in an MVar operation

mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
  bracketP mkConsumer cleanConsumer runHandler
 where
  mkConsumer = do
    chan <- liftIO $ newEmptyMVar
    client <- liftIO.hookToChan $ chan
    return (chan, client)

  cleanConsumer (_, client) =
    liftIO.disconnectClient $ client

  runHandler (chan, client) = do
    newMsg <- liftIO $ readMVar chan
    yield newMsg
    runHandler (chan, client)

(hookToChan just tells the client to subscribe to the queue using this function: \topic msg -> putMVar chan (show msg))


Solution

  • Thanks to the comments that Cirdec made, I've managed to fix the issue.

    The problem was that I was spawning the client in the same thread.

    hookToChan was the responsible of doing so, and I was subscribing to the queue on the same thread. I've just added a forkIO to the hookToChan function, and the issue went away.