I am implementing a simple program in Celluloid that ideally will run a few actors in parallel, each of which will compute something, and then send its result back to a main actor, whose job is simply to aggregate results.
Following this FAQ, I introduced a SupervisionGroup
, like this:
module Shuffling
class AggregatorActor
include Celluloid
def initialize(shufflers)
@shufflerset = shufflers
@results = {}
end
def add_result(result)
@results.merge! result
@shufflerset = @shufflerset - result.keys
if @shufflerset.empty?
self.output
self.terminate
end
end
def output
puts @results
end
end
class EvalActor
include Celluloid
def initialize(shufflerClass)
@shuffler = shufflerClass.new
self.async.runEvaluation
end
def runEvaluation
# computation here, which yields result
Celluloid::Actor[:aggregator].async.add_result(result)
self.terminate
end
end
class ShufflerSupervisionGroup < Celluloid::SupervisionGroup
shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman].to_set
supervise AggregatorActor, as: :aggregator, args: [shufflers.map { |sh| sh.new.name }]
shufflers.each do |shuffler|
supervise EvalActor, as: shuffler.name.to_sym, args: [shuffler]
end
end
ShufflerSupervisionGroup.run
end
I terminate the EvalActor
s after they're done, and I also terminate the AggregatorActor
when all of the workers are done.
However, the supervision thread stays alive and keeps the main thread alive. The program never terminates.
If I send .run!
to the group, then the main thread terminates right after it, and nothing works.
What can I do to terminate the group (or, in group terminology, finalize
, I suppose) after the AggregatorActor
terminates?
What I did after all, is change the AggregatorActor
to have a wait_for_results
:
class AggregatorActor
include Celluloid
def initialize(shufflers)
@shufflerset = shufflers
@results = {}
end
def wait_for_results
sleep 5 while not @shufflerset.empty?
self.output
self.terminate
end
def add_result(result)
@results.merge! result
@shufflerset = @shufflerset - result.keys
puts "Results for #{result.keys.inspect} recorded, remaining: #{@shufflerset.inspect}"
end
def output
puts @results
end
end
And then I got rid of the SupervisionGroup
(since I didn't need supervision, ie rerunning of actors that failed), and I used it like this:
shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman, RiffleShuffle].to_set
Celluloid::Actor[:aggregator] = AggregatorActor.new(shufflers.map { |sh| sh.new.name })
shufflers.each do |shuffler|
Celluloid::Actor[shuffler.name.to_sym] = EvalActor.new shuffler
end
Celluloid::Actor[:aggregator].wait_for_results
That doesn't feel very clean, it would be nice if there was a cleaner way, but at least this works.