Search code examples
rubyeventmachine

Ruby Event Machine stop or kill deffered operation


I was wondering if I could stop execution of an operation that has been deffered.

require 'rubygems'
require 'em-websocket'

EM.run do
  EM::WebSocket.start(:host => '0.0.0.0', :port => 8080) do |ws|
     ws.onmessage do |msg|
       op = proc do
         sleep 5 # Thread safe IO here that is safely killed
         true
       end

      callback = proc do |result|
         puts "Done!"
      end

      EM.defer(op, callback)
    end
  end
end

This is an example web socket server. Sometimes when I get a message I want to do some IO, later on another message might come in that needs to read the same thing, the next thing always has precedence over the previous thing. So I want to cancel the first op and do the second.


Solution

  • Here is my solution. It is similar to the EM.queue solution, but just uses a hash.

    require 'rubygems'
    require 'em-websocket'
    require 'json'
    
    EM.run do
      EM::WebSocket.start(:host => '0.0.0.0', :port => 3333) do |ws|
        mutex = Mutex.new # to make thread safe. See https://github.com/eventmachine/eventmachine/blob/master/lib/eventmachine.rb#L981
        queue = EM::Queue.new
        ws.onmessage do |msg|
          message_type = JSON.parse(msg)["type"]
          op = proc do
            mutex.synchronize do
              if message_type == "preferred"
                puts "killing non preferred\n"
                queue.size.times { queue.pop {|thread| thread.kill } }
              end
              queue << Thread.current
            end
    
            puts "doing the long running process"
            sleep 15 # Thread safe IO here that is safely killed
            true
          end
    
          callback = proc do |result|
            puts "Finished #{message_type} #{msg}"
          end
    
          EM.defer(op, callback)
        end
      end
    end