Search code examples
rubymultithreadingprocessthread-safetyspawn

How to asynchronously collect results from new threads created in real time in ruby


I would like to continously check the table in the DB for the commands to run. Some commands might take 4minutes to complete, some 10 seconds.

Hence I would like to run them in threads. So every record creates new thread, and after thread is created, record gets removed.

Because the DB lookup + Thread creation will run in an endless loop, how do I get the 'response' from the Thread (thread will issue shell command and get response code which I would like to read) ?

I thought about creating two Threads with endless loop each: - first for DB lookups + creating new threads - second for ...somehow reading the threads results and acting upon each response

Or maybe I should use fork, or os spawn a new process?


Solution

  • You can have each thread push its results onto a Queue, then your main thread can read from the Queue. Reading from a Queue is a blocking operation by default, so if there are no results, your code will block and wait on the read.

    http://ruby-doc.org/stdlib-2.0.0/libdoc/thread/rdoc/Queue.html

    Here is an example:

    require 'thread'
    
    jobs = Queue.new
    results = Queue.new
    
    thread_pool = []
    pool_size = 5
    
    (1..pool_size).each do |i|
      thread_pool << Thread.new do 
        loop do 
          job = jobs.shift #blocks waiting for a task
          break if job == "!NO-MORE-JOBS!"
    
          #Otherwise, do job...
          puts "#{i}...."
          sleep rand(1..5) #Simulate the time it takes to do a job
          results << "thread#{i} finished #{job}"  #Push some result from the job onto the Queue
          #Go back and get another task from the Queue
        end
      end
    end
    
    
    #All threads are now blocking waiting for a job...
    puts 'db_stuff'
    db_stuff = [
      'job1', 
      'job2', 
      'job3', 
      'job4', 
      'job5',
      'job6',
      'job7',
    ]
    
    db_stuff.each do |job|
      jobs << job
    end
    
    #Threads are now attacking the Queue like hungry dogs.
    
    pool_size.times do
      jobs << "!NO-MORE-JOBS!"
    end
    
    result_count = 0
    
    loop do
      result = results.shift
      puts "result: #{result}"
      result_count +=1
      break if result_count == 7
    end