In a ruby application I have a bunch of tasks which share no state and I want to launch them off many at a time. Crucially, I don't care about the order they are started in, nor their return values (as they will each incur database transactions before they complete). I'm aware that depending on my ruby implementation the GIL may prevent these tasks from actually running at the same time, but that's OK because I'm not actually interested in true concurrency: these worker threads will be IO bound over network requests anyways.
What I've got so far is this:
def asyncDispatcher(numConcurrent, stateQueue, &workerBlock)
workerThreads = []
while not stateQueue.empty?
while workerThreads.length < numConcurrent
nextState = stateQueue.pop
nextWorker =
Thread.new(nextState) do |st|
workerBlock.call(st)
end
workerThreads.push(nextWorker)
end # inner while
workerThreads.delete_if{|th| not th.alive?} # clean up dead threads
end # outer while
workerThreads.each{|th| th.join} # join any remaining workers
end # asyncDispatcher
And I invoke it like this:
asyncDispatcher(2, (1..10).to_a ) {|x| x + 1}
Are there any lurking bugs or concurrency pitfalls here? Or perhaps something in the runtime which would simplify this task?
Use a Queue:
require 'thread'
def asyncDispatcher(numWorkers, stateArray, &processor)
q = Queue.new
threads = []
(1..numWorkers).each do |worker_id|
threads << Thread.new(processor, worker_id) do |processor, worker_id|
while true
next_state = q.shift #shift() blocks if q is empty, which is the case now
break if next_state == q #Some sentinel that won't appear in your data
processor.call(next_state, worker_id)
end
end
end
stateArray.each {|state| q.push state}
stateArray.each {q.push q} #Some sentinel that won't appear in your data
threads.each(&:join)
end
asyncDispatcher(2, (1..10).to_a) do |state, worker_id|
time = sleep(Random.rand 10) #How long it took to process state
puts "#{state} is finished being processed: worker ##{worker_id} took #{time} secs."
end
--output:--
2 is finished being processed: worker #1 took 4 secs.
3 is finished being processed: worker #1 took 1 secs.
1 is finished being processed: worker #2 took 7 secs.
5 is finished being processed: worker #2 took 1 secs.
6 is finished being processed: worker #2 took 4 secs.
7 is finished being processed: worker #2 took 1 secs.
4 is finished being processed: worker #1 took 8 secs.
8 is finished being processed: worker #2 took 1 secs.
10 is finished being processed: worker #2 took 3 secs.
9 is finished being processed: worker #1 took 9 secs.
Okay, okay, someone is going look at that output and cry out,
Hey, #2 took a total of 13 seconds to do four jobs in a row, while #1 took only 8 secs. for a job, so #1's output for the 8 sec. job should have come earlier. There's no thread switching in Ruby! Ruby is broken!".
Well, while #1 was sleeping for its first two jobs for a total of 5 seconds, #2 was sleeping at the same time, so #2 only had 2 more seconds left to sleep when #1 finished it's first two jobs. So replace #2's 7 secs by 2 secs, and you'll see that after number #1 finished its first two jobs, #2 took a total of 8 seconds for its run of four jobs in a row, which tied #1 for it's 8 second job.