I'm having trouble getting a queue subscribe block to execute if in a thread.
The example from rubybunny/exchanges works, as expected. However, if adapted with the consumer portion in a thread, the subscriber block appears not to execute.
I've tried several simple variations including setting a shared variable flag, all without success.
What am I missing?
Code#!/usr/bin/env ruby
require "bunny"
quit = false
consumer = Thread.new do
puts "consumer start"
cnx = Bunny.new
cnx.start
cn = cnx.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
loop {
sleep 1
break if quit
}
cnx.close
puts "consumer done"
end
connection = Bunny.new
connection.start
connection = connection.create_channel
exchange = connection.topic("weathr", :auto_delete => true)
exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :routing_key => "europe.italy.roma").
publish("Paris update", :routing_key => "europe.france.paris")
sleep 5
connection.close
quit = true
consumer.join
Actual Output
consumer start
consumer done
Expected Output
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
An update for North America: New York update, routing key is americas.north.us.ny.newyork
consumer done
The thread's subscribe block is not executing because the queue simply doesn't receive any messages. To elaborate, in this case, the queue ends up being created after the messages are published.
This can be visualized by switching the messages to :mandatory => true
and using Bunny::Exchange#on_return
:
#!/usr/bin/env ruby
require "bunny"
quit = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
Output
consumer start
San Diego update was returned! reply_code = 312, reply_text = NO_ROUTE
Berkeley update was returned! reply_code = 312, reply_text = NO_ROUTE
San Francisco update was returned! reply_code = 312, reply_text = NO_ROUTE
New York update was returned! reply_code = 312, reply_text = NO_ROUTE
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
As we can see, all messages ended up getting returned with NO_ROUTE
.
A simple solution to force the queue (and route) to exist before the messages are published:
#!/usr/bin/env ruby
require "bunny"
quit = false
consumer_queued = false
connection = Bunny.new
connection.start
consumer = Thread.new do
puts "consumer start"
cn = connection.create_channel
ex = cn.topic("weathr", :auto_delete => true)
q = cn.queue("", :exclusive => true).bind(ex, :routing_key => "americas.north.#")
consumer_queued = true
q.subscribe do |delivery_info, properties, payload|
puts "An update for North America: #{payload}, routing key is #{delivery_info.routing_key}"
$stdout.flush
end
sleep 1 while !quit
cn.close
puts "consumer done"
end
# ensure queue is ready
sleep 0.125 while !consumer_queued
channel = connection.create_channel
exchange = channel.topic("weathr", :auto_delete => true)
exchange.on_return do |basic_return, properties, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
$stdout.flush
end
exchange.publish("San Diego update", :mandatory => true, :routing_key => "americas.north.us.ca.sandiego").
publish("Berkeley update", :mandatory => true, :routing_key => "americas.north.us.ca.berkeley").
publish("San Francisco update", :mandatory => true, :routing_key => "americas.north.us.ca.sanfrancisco").
publish("New York update", :mandatory => true, :routing_key => "americas.north.us.ny.newyork").
publish("São Paolo update", :mandatory => true, :routing_key => "americas.south.brazil.saopaolo").
publish("Hong Kong update", :mandatory => true, :routing_key => "asia.southeast.hk.hongkong").
publish("Kyoto update", :mandatory => true, :routing_key => "asia.southeast.japan.kyoto").
publish("Shanghai update", :mandatory => true, :routing_key => "asia.southeast.prc.shanghai").
publish("Rome update", :mandatory => true, :routing_key => "europe.italy.roma").
publish("Paris update", :mandatory => true, :routing_key => "europe.france.paris")
channel.close
sleep 5
quit = true
consumer.join
connection.close
Output (with return notices)
consumer start
An update for North America: San Diego update, routing key is americas.north.us.ca.sandiego
São Paolo update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: Berkeley update, routing key is americas.north.us.ca.berkeley
Hong Kong update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: San Francisco update, routing key is americas.north.us.ca.sanfrancisco
Kyoto update was returned! reply_code = 312, reply_text = NO_ROUTE
An update for North America: New York update, routing key is americas.north.us.ny.newyork
Shanghai update was returned! reply_code = 312, reply_text = NO_ROUTE
Rome update was returned! reply_code = 312, reply_text = NO_ROUTE
Paris update was returned! reply_code = 312, reply_text = NO_ROUTE
consumer done
The expected messages are received and the rest are returned.