Search code examples
rubymultithreadingrubiniuscelluloid

How can I terminate a SupervisionGroup?


I am implementing a simple program in Celluloid that ideally will run a few actors in parallel, each of which will compute something, and then send its result back to a main actor, whose job is simply to aggregate results.

Following this FAQ, I introduced a SupervisionGroup, like this:

module Shuffling           
  class AggregatorActor
    include Celluloid      

    def initialize(shufflers)
      @shufflerset = shufflers
      @results = {}        
    end

    def add_result(result)
      @results.merge! result

      @shufflerset = @shufflerset - result.keys

      if @shufflerset.empty?
        self.output
        self.terminate
      end
    end

    def output
      puts @results
    end
  end

  class EvalActor
    include Celluloid

    def initialize(shufflerClass)
      @shuffler = shufflerClass.new
      self.async.runEvaluation
    end

    def runEvaluation
      # computation here, which yields result
      Celluloid::Actor[:aggregator].async.add_result(result)
      self.terminate
    end
  end

  class ShufflerSupervisionGroup < Celluloid::SupervisionGroup
    shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman].to_set

    supervise AggregatorActor, as: :aggregator, args: [shufflers.map { |sh| sh.new.name }]

    shufflers.each do |shuffler|
      supervise EvalActor, as: shuffler.name.to_sym, args: [shuffler]
    end
  end

  ShufflerSupervisionGroup.run
end

I terminate the EvalActors after they're done, and I also terminate the AggregatorActor when all of the workers are done.

However, the supervision thread stays alive and keeps the main thread alive. The program never terminates.

If I send .run! to the group, then the main thread terminates right after it, and nothing works.

What can I do to terminate the group (or, in group terminology, finalize, I suppose) after the AggregatorActor terminates?


Solution

  • What I did after all, is change the AggregatorActor to have a wait_for_results:

    class AggregatorActor
      include Celluloid
    
      def initialize(shufflers)
        @shufflerset = shufflers
        @results = {}
      end
    
      def wait_for_results
        sleep 5 while not @shufflerset.empty?
    
        self.output
        self.terminate
      end
    
      def add_result(result)
        @results.merge! result
    
        @shufflerset = @shufflerset - result.keys
    
        puts "Results for #{result.keys.inspect} recorded, remaining: #{@shufflerset.inspect}"
      end
    
      def output
        puts @results
      end
    end
    

    And then I got rid of the SupervisionGroup (since I didn't need supervision, ie rerunning of actors that failed), and I used it like this:

    shufflers = [RubyShuffler, PileShuffle, VariablePileShuffle, VariablePileShuffleHuman, RiffleShuffle].to_set
    
    Celluloid::Actor[:aggregator] = AggregatorActor.new(shufflers.map { |sh| sh.new.name })
    
    shufflers.each do |shuffler|
      Celluloid::Actor[shuffler.name.to_sym] = EvalActor.new shuffler
    end
    
    Celluloid::Actor[:aggregator].wait_for_results
    

    That doesn't feel very clean, it would be nice if there was a cleaner way, but at least this works.