Search code examples
rubyproducer-consumer

Ruby - Unexpected results using multi-threading (producer/consumer model) when iterating


NOTE: I'm choosing to use threading to resolve DNS names, but the same behavior can likely be reproduced with any type of similar operation.


I am receiving unexpected results when I trying to move my (previously working) code from standard single-threaded execution to multi-threading. Specifically, my code iterates over an array of hashes and adds a key/value pair to each hash in the array.

The problem I am having appears to be coming from the dns_cname.map loop where the new key/value pair is being created. Instead of the "external_dns_entry" key having the correct value (that is, result.name.to_s which contains the name resolved by DNS), I am getting a name for one of the many other servers in url_nameserver_mapping instead.

I have a feeling that the DNS resolutions are happening as threads become available and the hash is getting updated out of order, but I don't even know how to begin to track an issue like this down.

Problematic Results: The DNS resolution ran against server1 is mapping to server 17. Likewise, server 17 is mapping to server 99, etc. The rest of the Hash remains in tact.

Any help is GREATLY appreciated. Thanks very much in advance!

Here is my code when multi-threading is NOT enabled (WORKS PROPERLY):

url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
                           "server2" => "portlanddns.dns.com",
                           "server3" => "losangelesdns.dns.com" }


# Parse the JSON string response from the API into a valid Ruby Hash
# The net/http GET request is not shown here for brevity but it was stored in 'response'
unsorted_urls = JSON.parse(response.body)

# Sort (not sure this is relevant)
# I left it since my data is being populated to the Hash incorrectly (w/ threading enabled)
url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}

url_nameserver_mapping.each do |server,location|

      dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
      dns_cname = dns.getresources(server, Resolv::DNS::Resource::IN::CNAME)

      dns_cname.map do |result|
         # Create a new key/value for each Hash in url_properties Array
         # Occurs if the server compared matches the value of url['server'] key
         url_properties.each do |url|
           url["external_dns_entry"] = result.name.to_s if url['server'] == server
         end
      end

end

I followed the guide at https://blog.engineyard.cm/2013/ruby-concurrency to implement the producer/consumer threading model.

Here is my adapted code when multi-threading IS enabled (NOT WORKING):

require 'thread'
require 'monitor'

thread_count = 8
threads = Array.new(thread_count)
producer_queue = SizedQueue.new(thread_count)
threads.extend(MonitorMixin)
threads_available = threads.new_cond
sysexit = false

url_nameserver_mapping = { "server1" => "dallasdns.dns.com",
                           "server2" => "portlanddns.dns.com",
                           "server3" => "losangelesdns.dns.com" }


unsorted_urls = JSON.parse(response.body)

url_properties = unsorted_urls['hostnames']['items'].sort_by { |k| k["server"]}

####################
##### Consumer #####
####################

consumer_thread = Thread.new do

  loop do

    break if sysexit && producer_queue.length == 0
    found_index = nil

    threads.synchronize do
      threads_available.wait_while do
        threads.select { |thread| thread.nil? ||
                                  thread.status == false ||
                                  thread["finished"].nil? == false}.length == 0
      end
      # Get the index of the available thread
      found_index = threads.rindex { |thread| thread.nil? ||
                                              thread.status == false ||
                                              thread["finished"].nil? == false }
    end

    @domain = producer_queue.pop

      threads[found_index] = Thread.new(@domain) do

        dns = Resolv::DNS.new(:nameserver => ['8.8.8.8'])
        dns_cname = dns.getresources(@domain, Resolv::DNS::Resource::IN::CNAME)

        dns_cname.map do |result|
           url_properties.each do |url|
             url["external_dns_entry"] = result.name.to_s if url['server'] == @domain
           end
        end

        Thread.current["finished"] = true

        # Notify the consumer that another batch of work has been completed
        threads.synchronize { threads_available.signal }
      end
  end
end

####################
##### Producer #####
####################

producer_thread = Thread.new do

  url_nameserver_mapping.each do |server,location|

    producer_queue << server

    threads.synchronize do
      threads_available.signal
    end
  end
  sysexit = true
end

# Join on both the producer and consumer threads so the main thread doesn't exit
producer_thread.join
consumer_thread.join

# Join on the child processes to allow them to finish
threads.each do |thread|
  thread.join unless thread.nil?
end

Solution

  • @domain is shared by all the threads - this sharing is the root of your problem: when it is updated by popping the next unit of work from the queue, all your threads see that change. You can avoid this problem by doing

    Thread.new(producer_queue.pop) do |domain|
       #domain isn't shared with anyone (as long as there
       #is no local variable called domain in the enclosing scope
    end
    

    Tangential to your question, but this seems liked a really overengineered approach. Much easier to spin up a bunch of consumer threads ahead of time and have them read directly from the queue of work.