Search code examples
rubyudpeventmachineem-http-request

Using EventMachine to make multiple request using same connection?


I have an EventMachine running that is listening for UDP packets. Every time a UDP packet is received, I would like to forward the data to some database using a REST call. I have created an EM::Connection subclass in which the receive_data method forwards the data through a REST call.

Since the frequency of data is quite high, I would like to reuse the connection between requests (using "keep-alive") and if possible also use pipelining. What would be a good way to share the connection between different calls?

Currently my UDPHandler looks something like the following:

module Udp
  module Collector
    class UDPHandler < EM::Connection
      def receive_data(data)
        http = EventMachine::HttpRequest.new('http://databaseurl.com/').post body: data
      end
    end
  end
end

This class is called as follows:

EM.run do
  EM.open_datagram_socket('0.0.0.0', 9000, Udp::Collector::UDPHandler)
end

I thought of making the request object a class variable but I don't think that is a good idea in the context of eventmachines. Or is it?


Solution

  • I believe something like this should work

    module Udp
      module Collector
        class UDPHandler < EM::Connection
          def http_connection
            @http_connection ||= EventMachine::HttpRequest.new('http://databaseurl.com/')
          end
    
          def receive_data(data)
            http = http_connection.post body: data, keepalive: true
          end
        end
      end
    end
    

    But you can't execute parallel request in this manner, so you need to use some kind of connection pool.

    The most simplest one without queueing and other stuff is:

    module Udp
      module Collector
        class UDPHandler < EM::Connection
          def connection_pool
            @connection_pool ||= []
          end
    
          def get_connection
            conn = connection_pool.shift
            conn ||= EventMachine::HttpRequest.new('http://databaseurl.com/')
            conn
          end
    
          def receive_data(data)
            conn = get_connection
            request = conn.post body: data, keepalive: true
            request.callback do
              connection_pool << conn
            end
          end
        end
      end
    end