Search code examples
haskellrabbitmqamqp

Sharing a context between AMQP callbacks


There is a simple tutorial about RabbitMQ usage for Haskell where I took this piece of code

main :: IO ()
main = do
     conn <- openConnection "127.0.0.1" "/" "guest" "guest"
     ch   <- openChannel conn

     declareQueue ch newQueue {queueName       = "hello",
                               queueAutoDelete = False,
                               queueDurable    = False}

     putStrLn " [*] Waiting for messages. To exit press CTRL+C"
     consumeMsgs ch "hello" NoAck deliveryHandler

     -- waits for keypresses
     getLine
     closeConnection conn

deliveryHandler :: (Message, Envelope) -> IO ()
deliveryHandler (msg, metadata) =
  BL.putStrLn $ " [x] Received " <> msgBody msg

It simply explains how to get a message from the queue and process it with the callback.

The one thing might be simple to solve, but I'm struggling to understand how to add some mutable context inside the callback, so each time a function runs it can change it. Simply, how to calculate the message number in the queue order. I found that a possible solution is a State monad, is it?

And the second question - all these callbacks are processed in a parallel or not? If not, how to process them in parallel and keep the mutable context without a data race?


Solution

  • Building on @bergey's answer - you can create a mutable reference such as an IORef or MVar. These references can be passed to your handler using partial function application. Typed but not tested code follows.

    main :: IO ()
    main = do
         conn <- openConnection "127.0.0.1" "/" "guest" "guest"
         ch   <- openChannel conn
         ref  <- newMVar 0
    

    Notice the ref in the above and the generating function newMVar from Control.Concurrent.MVar.

         declareQueue ch newQueue {queueName       = "hello",
                                   queueAutoDelete = False,
                                   queueDurable    = False}
    
         putStrLn " [*] Waiting for messages. To exit press CTRL+C"
         consumeMsgs ch "hello" NoAck (deliveryHandler ref)
    

    See how we pass the ref to deliveryHandler by function application.

         -- waits for keypresses
         getLine
         closeConnection conn
    
    deliveryHandler :: MVar Int -> (Message, Envelope) -> IO ()
    deliveryHandler ref (msg, metadata) =
      BL.putStrLn $ " [x] Received " <> msgBody msg
      withMVar' ref $ \val ->
           do print val
              pure (val + 1)
    

    And finally we can work with ref using function from Control.Concurrent.MVar, getting the old value and replacing it with a new value as desired.