I need to come up with a strategy to process and update documents in an elasticsearch index periodically and efficiently. I do not have to look at documents that I processed before.
My setting is that I have a long running process, which continuously inserts documents to an index, say approx. 500 documents per hour (think about the common logging example).
I need to find a solution to update some amount of documents periodically (via cron job, e.g) to run some code on a specific field (text field, eg.) to enhance that document with a number of new fields. I want to do this to offer more fine grained aggregations on the index. In the logging analogy, this could be, e.g., I get the UserAgent-string from a log entry (document), do some parsing on that, and add some new fields back to that document and index it.
So my approach would be:
must_not
and exists
, for instance.I know there is the Update by query API. But this does not seem to be right here, since I need to run my own code (which btw depends on external libraries), on my server and not as a painless script, which would not offer that comprehensive tasks I need.
I am accessing elasticsearch via python.
The problem is now that I don't know how to implement the above approach. E.g. what if the amount of document obtained in step 1. is larger than myindex.settings.index.max_result_window
?
Any ideas?
I considered @Jay's comment and ended up with this pattern, for the moment:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan
from my_module.postprocessing import post_process_doc
es = Elasticsearch(...)
es.ping()
def update_docs( docs ):
""""""
for idx,doc in enumerate(docs):
if idx % 10000 == 0:
print( 'next 10k' )
new_field_value = post_process_doc( doc )
doc_update = {
"_index": doc["_index"],
"_id" : doc["_id"],
"_op_type" : "update",
"doc" : { <<the new field>> : new_field_value }
}
yield doc_update
docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )
bulk( es, update_docs( docs ) )
Comments:
preserve_over=True
, otherwise an error was thrown.