I am implementing a Conduit Source for some client that subscribes to a queue and puts all the arriving messages into an MVar.
The problem is that I cannot read from that MVar to yield
those messages through the Conduit Source, as it reports an exception on runtime: thread blocked indefinitely in an MVar operation
mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
bracketP mkConsumer cleanConsumer runHandler
where
mkConsumer = do
chan <- liftIO $ newEmptyMVar
client <- liftIO.hookToChan $ chan
return (chan, client)
cleanConsumer (_, client) =
liftIO.disconnectClient $ client
runHandler (chan, client) = do
newMsg <- liftIO $ readMVar chan
yield newMsg
runHandler (chan, client)
(hookToChan
just tells the client to subscribe to the queue using this function: \topic msg -> putMVar chan (show msg)
)
Thanks to the comments that Cirdec made, I've managed to fix the issue.
The problem was that I was spawning the client in the same thread.
hookToChan
was the responsible of doing so, and I was subscribing to the queue on the same thread. I've just added a forkIO
to the hookToChan
function, and the issue went away.