Search code examples

Bulk index / create documents with elasticsearch for python

I am generating a large number of elasticsearch documents with random content using python and index them with elasticsearch-py.

Simplified working example (document with just one field):

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('')

for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    es_client.index(index='my_index', document=document)

Since this makes one request per document, I tried to speed it up by sending chunks of 1000 documents each using the _bulk API. However, my attempts so far have been unsuccessful.

My understanding from the docs is that you can pass an iterable to bulk(), so I tried:

from elasticsearch import Elasticsearch
from random import getrandbits

es_client = Elasticsearch('')

document_list = []
for i in range(1,10000000):
    document = {'my_field': getrandbits(64)}
    if i % 1000 == 0:
        es_client.bulk(operations=document_list, index='my_index')
        document_list = []

but this results in a

elasticsearch.BadRequestError: BadRequestError(400, 'illegal_argument_exception', 'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')


  • Ok, seems I have mixed up two different functions: helpers.bulk() and Elasticsearch.bulk(). Either can be used to achieve what I intended to do, but they have a slightly different signature.

    The helpers.bulk() function takes an Elasticsearch() object and an iterable containing the documents as parameters. The operation can be specified as _op_type and can be one of index, create, delete, or update. Since _op_type defaults to index, we can just omit it and simply pass the list of documents in this case:

    from elasticsearch import Elasticsearch, helpers
    from random import getrandbits
    es_client = Elasticsearch('')
    document_list = []
    for i in range(1,10000000):
        document = {'my_field': getrandbits(32)}
        if i % 1000 == 0:
            helpers.bulk(es_client, document_list, index='my_index')
            document_list = []

    This works fine.

    The Elasticsearch.bulk() function can be used alternatively, but the actions/operations are mandatory as part of the iterable here and the syntax is slightly different. This means that instead of just a dict with the document contents, we need to have a dict specifying both the action (in this case "index": {}), as well as the body for each document. See also _bulk documentation:

    from elasticsearch import Elasticsearch
    from random import getrandbits
    es_client = Elasticsearch('')
    actions_list = []
    for i in range(1,10000000):
        document = {'my_field': getrandbits(32)}
        actions_list.append({"index": {}, "doc": document})
        if i % 1000 == 0:
            es_client.bulk(operations=actions_list, index='my_index')
            actions_list = []

    This works fine as well.

    I assume that both of the above generate the same _bulk REST API statement internally, so they should be equivalent in the end.


    As pointed out by Johan, the helpers.bulk() function internally takes care of the chunking (it actually calls helpers.streaming_bulk() internally), so there is no need to manually assign action lists of size 1000 to it. For my final solution, I ended up writing a generator function that yields one document / action at a time anyway. This can then simply be passed directly to helpers.streaming_bulk(), along with a chunk_size of your choosing (the default value is 500):

    from elasticsearch import Elasticsearch, helpers                                
    from random import getrandbits                                                  
    es_client = Elasticsearch('')
    def doc_stream():                                                          
       ''' generator function for stream of actions '''                      
       for i in range(1,10000000):                                                  
           yield {'_index': 'my_index',                                   
                  '_source': {'my_field': getrandbits(32)} }                                                                                                                                                  
    for status_ok, response in helpers.streaming_bulk(es_client,
        if not status_ok:                                                           
            # if failure inserting, log response                                                  