Search code examples
ruby-on-railsrabbitmqdelayed-jobbunny

Is it possible to call Bunny::Exchange publish from Delayed Job?


I have a Rails app that sometimes publishes messages to a RabbitMQ queue, using the gem "Bunny". Here's the setup:

# config/initializers/bunny.rb
$mq_connection = Bunny.new
$mq_connection.start
$mq_channel = $mq_connection.create_channel

Anywhere in the app I can then call:

exchange = $mq_channel.default_exchange
exchange.publish(msg.to_json, persistent: true, routing_key: '...')

This works great if I call it from the app, or from the console, but it doesen't work if it's called from a DelayedJob job. No exception is raised, but the message is just not sent.

Trying with a singleton:

It looked like global variables like $mq_channel couldn't be found by DelayedJob, so I created a singleton model to store it:

class RabbitMq
  include Singleton

  attr_accessor :connection, :channel

  def exchange
    channel.default_exchange
  end

  def setup
    self.connection = Bunny.new
    self.connection.start
    self.channel = connection.create_channel
  end

end

And I call the setup from my initializer:

# config/initializers/bunny.rb
RabbitMq.instance.setup

But that doesn't work as well. The job terminates without error, but nothing is published.

Any idea how to do that? It should be quite common to publish messages to RabbitMQ from a background worker like DJ.


Solution

  • Here's how I do it:

    class Messaging::Publisher
    
      class << self
    
        def publish(message)
          new(message).publish
        end
    
      end # Class Methods
    
      #=========================================================================
      # Instance Methods      
      #=========================================================================
    
        def initialize(message)
          @message = message
        end
    
        def publish
          connection = Bunny.new(ENV['CLOUDAMQP_URL'])
          connection.start
          channel = connection.create_channel
          queue_name = "#{ENV['app_name']}.#{message.keys.first.to_s.pluralize}_queue"
          queue = channel.queue(queue_name, durable: true)
          channel.default_exchange.publish(message.to_json, :routing_key => queue.name)
          channel.close
          connection.stop
          true
        end
    
      private
    
        def message()   @message    end
    
    end
    

    I call this both from within my app (synchronous) and from background jobs (asynchronous). Something like this:

    class ServiceRequests::CreateManager < ServiceRequests::ManagerBase
    
      class << self
    
      private
    
      end # Class Methods
    
      #=========================================================================
      # Instance Methods
      #=========================================================================
    
        def manage
          Messaging::Publisher.publish service_request_message
        end
    
      private
    
        def service_request_message
          {
            service_request: {
              provider: {
                name: "Foo::Bar"
              },
              params: {
                baz: 'qux'
              }
            }
          }
        end
    
    end