Search code examples
pythonelasticsearchelasticsearch-py

ElasticSearch ConnectionPool using elasticsearch-py library


I am a novice in ElasticSearch, and trying to add entries to an index in ElasticSearch using concurrent connections from the ElasticSearch ConnectionPool [via the Transport class].

Here is my code:

import elasticsearch
from elasticsearch.transport import Transport

def init_connection():
    transport = Transport([{'host':SERVER_URL}], port=SERVER_PORT, randomize_hosts=False)
    transport.add_connection(host=SERVER_URL+SERVER_PORT)
    return transport

def add_entries_to_es(id, name):
    transport = init_connection()
    doc = {
           'name': name,
           'postDate': datetime.datetime.now(),
           'valid': "true",
           'suggest': {
               "input": name,
               'output': name,
               'payload': {'domain_id': id}
               }
           }
    conn = transport.getConnection()
    es = elasticsearch.Elasticsearch(connection_class=conn)
    res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc)
    ...

And I get the below error:

File "/my_project/elastichelper.py", line 23, in init_connection
transport.add_connection(host=SERVER_URL+SERVER_PORT)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 139, in add_connection
self.set_connections(self.hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 169, in set_connections
connections = map(_create_connection, hosts)
File "/Library/Python/2.7/site-packages/elasticsearch/transport.py", line 161, in _create_connection
kwargs.update(host)
ValueError: dictionary update sequence element #0 has length 1; 2 is required

I'm not sure if Transport class is the right way to instantiate a ConnectionPool in ElasticSearch. However, I read from docs that Transport class handles instantiation of the individual connections as well as creating a connection pool to hold them.

I'm not getting the correct way to instantiate a ConnectionPool and use the connections efficiently from the pool. Reading and googling hasn't helped in my favor.

I am also aware about the helpers.bulk() API, but I'm confused about using it since along with adding entries to index, I'm deleting the invalid entries as well.


Solution

  • I found out that simply using the ElasticSearch classes instance with an appropriate timeout value set [for me, timeout=30 was good enough] for the index method worked. Like this:

    doc = {
           'name': name,
           'postDate': datetime.datetime.now(),
           'valid': "true",
           'suggest': {
               "input": name,
               'output': name,
               'payload': {'domain_id': id}
               }
           }
    es = elasticsearch.Elasticsearch()
    res = es.index(index=ES_INDEX_NAME, doc_type=ES_DOC_TYPE, id=id, body=doc, timeout=30)
    

    I had initially faced timeout issues with simple ElasticSearch class instance, which was fixed by the above changes.

    I din't have to use Transport or Connection class instances explicitly at all.