There is a simple tutorial about RabbitMQ usage for Haskell where I took this piece of code
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck deliveryHandler
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: (Message, Envelope) -> IO ()
deliveryHandler (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
It simply explains how to get a message from the queue and process it with the callback.
The one thing might be simple to solve, but I'm struggling to understand how to add some mutable context inside the callback, so each time a function runs it can change it. Simply, how to calculate the message number in the queue order. I found that a possible solution is a State monad, is it?
And the second question - all these callbacks are processed in a parallel or not? If not, how to process them in parallel and keep the mutable context without a data race?
Building on @bergey's answer - you can create a mutable reference such as an IORef or MVar. These references can be passed to your handler using partial function application. Typed but not tested code follows.
main :: IO ()
main = do
conn <- openConnection "127.0.0.1" "/" "guest" "guest"
ch <- openChannel conn
ref <- newMVar 0
Notice the ref
in the above and the generating function newMVar
from Control.Concurrent.MVar
.
declareQueue ch newQueue {queueName = "hello",
queueAutoDelete = False,
queueDurable = False}
putStrLn " [*] Waiting for messages. To exit press CTRL+C"
consumeMsgs ch "hello" NoAck (deliveryHandler ref)
See how we pass the ref to deliveryHandler
by function application.
-- waits for keypresses
getLine
closeConnection conn
deliveryHandler :: MVar Int -> (Message, Envelope) -> IO ()
deliveryHandler ref (msg, metadata) =
BL.putStrLn $ " [x] Received " <> msgBody msg
withMVar' ref $ \val ->
do print val
pure (val + 1)
And finally we can work with ref using function from Control.Concurrent.MVar
, getting the old value and replacing it with a new value as desired.