Search code examples
pythonelasticsearchcroninsert-updateelasticsearch-py

Periodically process and update documents in elasticsearch index


I need to come up with a strategy to process and update documents in an elasticsearch index periodically and efficiently. I do not have to look at documents that I processed before.

My setting is that I have a long running process, which continuously inserts documents to an index, say approx. 500 documents per hour (think about the common logging example).

I need to find a solution to update some amount of documents periodically (via cron job, e.g) to run some code on a specific field (text field, eg.) to enhance that document with a number of new fields. I want to do this to offer more fine grained aggregations on the index. In the logging analogy, this could be, e.g., I get the UserAgent-string from a log entry (document), do some parsing on that, and add some new fields back to that document and index it.

So my approach would be:

  1. Get some amount of documents (or even all) that I haven't looked at before. I could query them by combining must_not and exists, for instance.
  2. Run my code on these documents (run the parser, compute some new stuff, whatever).
  3. Update the documents obtained previously (probably most preferably via bulk api).

I know there is the Update by query API. But this does not seem to be right here, since I need to run my own code (which btw depends on external libraries), on my server and not as a painless script, which would not offer that comprehensive tasks I need.

I am accessing elasticsearch via python.

The problem is now that I don't know how to implement the above approach. E.g. what if the amount of document obtained in step 1. is larger than myindex.settings.index.max_result_window?

Any ideas?


Solution

  • I considered @Jay's comment and ended up with this pattern, for the moment:

    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    from elasticsearch.helpers import scan
    
    from my_module.postprocessing import post_process_doc
    
    es = Elasticsearch(...)
    es.ping()
    
    def update_docs( docs ):
        """"""
        for idx,doc in enumerate(docs):
            if idx % 10000 == 0:
                print( 'next 10k' )
            
            new_field_value = post_process_doc( doc )
    
            doc_update = {
                "_index": doc["_index"],
                "_id" : doc["_id"],
                "_op_type" : "update",
                "doc" : { <<the new field>> : new_field_value }
            }
    
            yield doc_update
    
    docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )
    
    bulk( es, update_docs( docs ) )
    

    Comments: