Search code examples
rubymultithreadingrabbitmqbunny

RabbitMQ/bunny: subscribe block not called if within a thread


I'm having trouble getting a queue subscribe block to execute if in a thread.

The example from rubybunny/exchanges works, as expected. However, if adapted with the consumer portion in a thread, the subscriber block appears not to execute.

I've tried several simple variations including setting a shared variable flag, all without success.

What am I missing?

Code
#!/usr/bin/env ruby
require "bunny"

quit = false

consumer = Thread.new do
  puts "consumer start"

  cnx = Bunny.new
  cnx.start
  cn  = cnx.create_channel
  ex = cn.topic("weathr", :auto_delete => true)

  q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
  q.subscribe do |delivery_info, properties, payload|
    puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
  end

  loop {
    sleep 1
    break if quit
  }

  cnx.close
  puts "consumer done"
end

connection = Bunny.new
connection.start
connection  = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
  publish("Berkeley update",         :routing_key => "americas.north.us.ca.berkeley").
  publish("San Francisco update",    :routing_key => "americas.north.us.ca.sanfrancisco").
  publish("New York update",         :routing_key => "americas.north.us.ny.newyork").
  publish("São Paolo update",        :routing_key => "americas.south.brazil.saopaolo").
  publish("Hong Kong update",        :routing_key => "asia.southeast.hk.hongkong").
  publish("Kyoto update",            :routing_key => "asia.southeast.japan.kyoto").
  publish("Shanghai update",         :routing_key => "asia.southeast.prc.shanghai").
  publish("Rome update",             :routing_key => "europe.italy.roma").
  publish("Paris update",            :routing_key => "europe.france.paris")

sleep 5
connection.close

quit = true
consumer.join
Actual Output
consumer start
consumer done
Expected Output
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done

Solution

  • The thread's subscribe block is not executing because the queue simply doesn't receive any messages. To elaborate, in this case, the queue ends up being created after the messages are published.

    This can be visualized by switching the messages to :mandatory => true and using Bunny::Exchange#on_return:

    Code
    #!/usr/bin/env ruby
    require "bunny"
    
    quit = false
    
    connection = Bunny.new
    connection.start
    
    consumer = Thread.new do
      puts "consumer start"
      cn  = connection.create_channel
      ex = cn.topic("weathr", :auto_delete => true)
    
      q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
      q.subscribe do |delivery_info, properties, payload|
        puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
      end
    
      sleep 1 while !quit
    
      cn.close
      puts "consumer done"
    end
    
    channel = connection.create_channel
    exchange = channel.topic("weathr", :auto_delete => true)
    exchange.on_return do |basic_return, properties, payload|
      puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
    end
    
    exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
      publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
      publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
      publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
      publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
      publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
      publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
      publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
      publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
      publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")
    
    channel.close
    sleep 5
    
    quit = true
    consumer.join
    connection.close
    
    Output
    consumer start
    San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
    Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
    San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
    New York update was returned! reply_code = 312, reply_text = NO_ROUTE
    São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
    Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
    Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
    Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
    Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
    Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
    consumer done
    

    As we can see, all messages ended up getting returned with NO_ROUTE.

    A simple solution to force the queue (and route) to exist before the messages are published:

    #!/usr/bin/env ruby
    require "bunny"
    
    quit = false
    consumer_queued = false
    
    connection = Bunny.new
    connection.start
    
    consumer = Thread.new do
      puts "consumer start"
      cn  = connection.create_channel
      ex = cn.topic("weathr", :auto_delete => true)
    
      q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
      consumer_queued = true
      q.subscribe do |delivery_info, properties, payload|
        puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
        $stdout.flush
      end
    
      sleep 1 while !quit
    
      cn.close
      puts "consumer done"
    end
    
    # ensure queue is ready
    sleep 0.125  while !consumer_queued
    
    channel = connection.create_channel
    exchange = channel.topic("weathr", :auto_delete => true)
    exchange.on_return do |basic_return, properties, payload|
      puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
      $stdout.flush
    end
    
    exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
      publish("Berkeley update",         :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
      publish("San Francisco update",    :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
      publish("New York update",         :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
      publish("São Paolo update",        :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
      publish("Hong Kong update",        :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
      publish("Kyoto update",            :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
      publish("Shanghai update",         :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
      publish("Rome update",             :mandatory => true, :routing_key => "europe.italy.roma").
      publish("Paris update",            :mandatory => true, :routing_key => "europe.france.paris")
    
    channel.close
    sleep 5
    
    quit = true
    consumer.join
    connection.close
    
    Output (with return notices)
    consumer start
    An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
    São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
    An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
    Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
    An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
    Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
    An update for North America: New York update, routing key is americas.north.us.ny.newyork
    Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
    Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
    Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
    consumer done
    

    The expected messages are received and the rest are returned.