Search code examples
ruby-on-railsrubywebsocketeventmachine

Insert another URL into WebSocket::EventMachine Iterator


Let's say I have the following list of URLs:

urls = ['socket1.com', 'socket2.com']

I set up an EventMachine Iterator to open connections to those sockets

require 'websocket-eventmachine-client'

EM.run do
  EM::Iterator.new(urls, urls.size).each do |url, iterator|
    socket = WebSocket::EventMachine::Client.connect(uri: url)

    socket.onopen {puts "open #{url}"}
    socket.onmessage {|msg, type| puts msg}
    socket.onclose {|code, reason| puts "closed #{url}"}
  end
end

With that code I don't think I can add a connection to another URL if needs be. What I need to be able to do is add another connection, to socket3.com for instance, whilst not affecting the other connections.

Any thoughts?


Solution

  • I'm not sure an EM Iterator is the best tool for this, as you want to potentially add to the array whilst you're iterating over it, which doesn't sound very safe. From you're description, it sounds more like you need a pub/sub style queue that can respond when new URLs are added. Something like this might work (warning 100% untested!):

    class EMCallbackQueue
    
      attr_accesor :callback
    
      def push(item)
        callback.call(item)
      end
    
    end
    
    
    require 'websocket-eventmachine-client'
    
    EM.run do
    
      callback_queue = EMCallbackQueue.new
    
      # Assign a proc to the callback, this will called when a new url is pushed on the Queue
      callback_queue.callback = ->(url){ 
        socket = WebSocket::EventMachine::Client.connect(uri: url)
        socket.onopen {puts "open #{url}"}
        socket.onmessage {|msg, type| puts msg}
    
        # Maybe add to the callback queue from within a callback
        # based on the msg from the current connection
        callback_queue.push "additionsocket.com"
    
        socket.onclose {|code, reason| puts "closed #{url}"}
       }
    
       callback_queue.push "socket1.com"
       callback_queue.push "socket2.com"
    
    end
    

    EMCallbackQueue is simply a wrapper around the callback proc, when new url is appended, the callback proc is called, and because it's all in eventmachine, the WebSocket::EventMachine::Client will process the url on the next tick of the reactor loop, allowing the rest of the code to run, which in turn will queue up more urls.