I'm trying to run a rabbitmq background process on heroku to pick tasks off a queue and process them. I'm working with the AMQP haskell library and they give the following example (parts omitted or brevity.
main = do
--setup connection omitted
--connect to queue, wait for messages
consumeMsgs chan "myQueue" Ack myCallback
--halts process so messages are taken off the queue until a key is presseed
getLine -- wait for keypress
closeConnection conn -- close connection after key
putStrLn "connection closed"
This works fine locally because getLine
keeps the process running until you press a key. However, when I deploy this to heroku the process exits with
2016-04-19T08:37:23.373087+00:00 app[worker.1]: worker: <stdin>: hGetChar: end of file
I figured out from the accepted answer to this question that this is because in order to deploy a background process via ssh you need to redirect /dev/null/
to stdin
which sends an EOF
signal to the process.
In our case the getLine
function exits because of this signal and the entire process stops, preventing our worker from staying up.
How can I keep this worker running when I deploy?
EDIT: Final Solution Using @carstons comments I ended up with the following implementation that worked:
main :: IO ()
main = do
mvar <- newEmptyMVar
conn <- setupConnection
queueName <- pack <$> getEnv "QUEUE_NAME"
chan <- openChannel conn
consumeMsgs chan queueName Ack processMessage
installHandler sigINT (Catch (cleanupConnection conn mvar)) Nothing
putStrLn "Running forever, press ctrl c to exit"
-- this blocks until sigint is recieved and the handler for SIGINT
-- "fills" the mvar. once that is filled the process exits
run <- takeMVar mvar
case run of
_ -> return ()
mixpanelConfig :: IO Config
mixpanelConfig = liftM2 Config (ApiToken . pack <$> getEnv "MIXPANEL_API_TOKEN") (newManager tlsManagerSettings)
cleanupConnection :: Connection -> MVar () -> IO ()
cleanupConnection conn mvar = do
closeConnection conn
putStrLn "SIGINT received.. closing rabbitmq connection"
putMVar mvar ()
processMessage :: (Message, Envelope) -> IO ()
as I pointed out in the comment if you just want to keep it running forever you can use forever
with - for example - threadDelay
:
import Control.Concurrent (threadDelay)
import Control.Monad (forever)
main = do
--setup connection omitted
--connect to queue, wait for messages
consumeMsgs chan "myQueue" Ack myCallback
--halts process so messages are taken off the queue forever
forever $ threadDelay 10000
-- so this will never happen and you could remove it
closeConnection conn -- close connection after key
putStrLn "connection closed"
note that this will of course never really close the connection or exit the application - you'll have to kill the process.
the alternative would be a bit more involved as you need some message/way to send your program a termination signal.
An easy way is to use MVar
s which you could set in your myCallback
when a certain stop-message was received on your queue:
import Control.Concurrent.MVar
main = do
-- MVar to receve the quit-signal
quitSignal <- newEmptyMVar
--setup connection omitted
--connect to queue, wait for messages - the callback should
--set the quitSignal with putMVar quitSignal ()
consumeMsgs chan "myQueue" Ack (myCallback quitSignal)
--halts process so messages are taken off the queue till quitSignal
takeMVar quitSignal
-- so this will happen after quitSignal was set
closeConnection conn -- close connection after key
putStrLn "connection closed"