NOTE: I'm choosing to use threading to resolve DNS names, but the same behavior can likely be reproduced with any type of similar operation.
I am receiving unexpected results when I trying to move my (previously working) code from standard single-threaded execution to multi-threading. Specifically, my code iterates over an array of hashes and adds a key/value pair to each hash in the array.
The problem I am having appears to be coming from the dns_cname.map
loop where the new key/value pair is being created. Instead of the "external_dns_entry"
key having the correct value (that is, result.name.to_s
which contains the name resolved by DNS), I am getting a name for one of the many other servers in url_nameserver_mapping
instead.
I have a feeling that the DNS resolutions are happening as threads become available and the hash is getting updated out of order, but I don't even know how to begin to track an issue like this down.
Problematic Results: The DNS resolution ran against server1 is mapping to server 17. Likewise, server 17 is mapping to server 99, etc. The rest of the Hash remains in tact.
Any help is GREATLY appreciated. Thanks very much in advance!
Here is my code when multi-threading is NOT enabled (WORKS PROPERLY):
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
# Parse the JSON string response from the API into a valid Ruby Hash
# The net/http GET request is not shown here for brevity but it was stored in 'response'
unsorted_urls = JSON.parse(response.body)
# Sort (not sure this is relevant)
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
url_nameserver_mapping.each do |server,location|
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
# Create a new key/value for each Hash in url_properties Array
# Occurs if the server compared matches the value of url['server'] key
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == server
end
end
end
I followed the guide at https://blog.engineyard.cm/2013/ruby-concurrency to implement the producer/consumer threading model.
Here is my adapted code when multi-threading IS enabled (NOT WORKING):
require 'thread'
require 'monitor'
thread_count = 8
threads = Array.new(thread_count)
producer_queue = SizedQueue.new(thread_count)
threads.extend(MonitorMixin)
threads_available = threads.new_cond
sysexit = false
url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
"server2" => "portlanddns.dns.com",
"server3" => "losangelesdns.dns.com" }
unsorted_urls = JSON.parse(response.body)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}
####################
##### Consumer #####
####################
consumer_thread = Thread.new do
loop do
break if sysexit && producer_queue.length == 0
found_index = nil
threads.synchronize do
threads_available.wait_while do
threads.select { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false}.length == 0
end
# Get the index of the available thread
found_index = threads.rindex { |thread| thread.nil? ||
thread.status == false ||
thread["finished"].nil? == false }
end
@domain = producer_queue.pop
threads[found_index] = Thread.new(@domain) do
dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME)
dns_cname.map do |result|
url_properties.each do |url|
url["external_dns_entry"] = result.name.to_s if url['server'] == @domain
end
end
Thread.current["finished"] = true
# Notify the consumer that another batch of work has been completed
threads.synchronize { threads_available.signal }
end
end
end
####################
##### Producer #####
####################
producer_thread = Thread.new do
url_nameserver_mapping.each do |server,location|
producer_queue << server
threads.synchronize do
threads_available.signal
end
end
sysexit = true
end
# Join on both the producer and consumer threads so the main thread doesn't exit
producer_thread.join
consumer_thread.join
# Join on the child processes to allow them to finish
threads.each do |thread|
thread.join unless thread.nil?
end
@domain
is shared by all the threads - this sharing is the root of your problem: when it is updated by popping the next unit of work from the queue, all your threads see that change. You can avoid this problem by doing
Thread.new(producer_queue.pop) do |domain|
#domain isn't shared with anyone (as long as there
#is no local variable called domain in the enclosing scope
end
Tangential to your question, but this seems liked a really overengineered approach. Much easier to spin up a bunch of consumer threads ahead of time and have them read directly from the queue of work.