Search code examples
asynchronouselasticsearchtornadoconnection-poolingpool

Asynchronous Pool of Connections in Tornado with multiple processes


I'm using both Tornado 4.2.1 and tornadoes 2.4.1 libraries to query my Elasticsearch database and I'm looking for a way to initialize a Pool of connections to shared between several RequestHandler instances in a multiple processes service.

Is it possible to do that? Are there specific libraries for Tornado to do that?

Thanks in advance


Solution

  • Since tornado-es is just a HTTP client, it uses AsyncHTTPClient in the ESConnection. The new TCP connection is made every request, unless Connection: keep-alive header is specified.

    conn = ESConnection()
    conn.httprequest_kwargs['headers'] = {'Connection': 'keep-alive'}
    

    I've not tested, but it should work. I used similar setup in ruby (with patron http client), and it works well

    Next thing

    AsyncHTTPClient has limit of maximum number of simultaneous requests (fetch) per ioloop. Every request that hit the limit is just queued internally.

    You may want to increase the global limit:

    AsyncHTTPClient.configure(None, max_clients=50)
    

    or separate the client with its own limit (force_instance):

    from tornadoes import ESConnection
    from tornado.httpclient import AsyncHTTPClient
    
    class CustomESConnection(ESConnection):
    
        def __init__(self, , host='localhost', port='9200', io_loop=None, protocol='http', max_clients=20):
            super(CustomESConnection, self).__init__(host, port, io_loop, protocol)
            self.client = AsyncHTTPClient(force_instance=True, max_clients=max_clients)
    

    And finally

    To reuse the same ESConnection you can create it in the Application, since the application is available with every request (RequestHandler)

    from tornado.web import Application, RequestHandler
    from tornadoes import ESConnection
    
    class MainHandler(RequestHandler):
        def get(self):
            yield self.application.es.search('something')
    
    
    class MyApp(Application):
    
        def __init__(self, *args, **kwargs):
            super(MyApp, self).__init__(*args, **kwargs)
    
            self.es = ESconnection()
    
    if __name__ == "__main__":
        application = MyApp([
            (r"/", MainHandler),
        ])
        application.listen(8888)
        tornado.ioloop.IOLoop.current().start()
    

    Multiprocess

    Actually there is no easy way. The common approach is a pooler, which is used mostly when persistent connection is needed, like databases (pgbouncer for postgres) or as a optimization on high-load service.

    And you will have to write a pooler, a gateway application to es

    subprocess1 
               \  (http, zmq, ...)
                \            
                  > pooler (some queue and tornadoes api) - http -> elastisearch
                /
               /
    subprocess2
    

    The subprocesses could communicate with pooler via HTTP, ØMQ (there are many examples even pooler) or some implementation of IPC (sockects, ...).