I have a Rails app that sometimes publishes messages to a RabbitMQ queue, using the gem "Bunny". Here's the setup:
# config/initializers/bunny.rb
$mq_connection = Bunny.new
$mq_connection.start
$mq_channel = $mq_connection.create_channel
Anywhere in the app I can then call:
exchange = $mq_channel.default_exchange
exchange.publish(msg.to_json, persistent: true, routing_key: '...')
This works great if I call it from the app, or from the console, but it doesen't work if it's called from a DelayedJob job. No exception is raised, but the message is just not sent.
Trying with a singleton:
It looked like global variables like $mq_channel
couldn't be found by DelayedJob, so I created a singleton model to store it:
class RabbitMq
include Singleton
attr_accessor :connection, :channel
def exchange
channel.default_exchange
end
def setup
self.connection = Bunny.new
self.connection.start
self.channel = connection.create_channel
end
end
And I call the setup from my initializer:
# config/initializers/bunny.rb
RabbitMq.instance.setup
But that doesn't work as well. The job terminates without error, but nothing is published.
Any idea how to do that? It should be quite common to publish messages to RabbitMQ from a background worker like DJ.
Here's how I do it:
class Messaging::Publisher
class << self
def publish(message)
new(message).publish
end
end # Class Methods
#=========================================================================
# Instance Methods
#=========================================================================
def initialize(message)
@message = message
end
def publish
connection = Bunny.new(ENV['CLOUDAMQP_URL'])
connection.start
channel = connection.create_channel
queue_name = "#{ENV['app_name']}.#{message.keys.first.to_s.pluralize}_queue"
queue = channel.queue(queue_name, durable: true)
channel.default_exchange.publish(message.to_json, :routing_key => queue.name)
channel.close
connection.stop
true
end
private
def message() @message end
end
I call this both from within my app (synchronous) and from background jobs (asynchronous). Something like this:
class ServiceRequests::CreateManager < ServiceRequests::ManagerBase
class << self
private
end # Class Methods
#=========================================================================
# Instance Methods
#=========================================================================
def manage
Messaging::Publisher.publish service_request_message
end
private
def service_request_message
{
service_request: {
provider: {
name: "Foo::Bar"
},
params: {
baz: 'qux'
}
}
}
end
end