Search code examples
elasticsearchreindex

How to create index with new fields, parsing values from field in existing index?


I have an index that contains 1 document. This document has a field1 with value "B" and a field2 with value "C, D, E" (that is, the value in field2 is comma separated and can have variable lenght). I want to create a new index, that contains the following 3 documents: field1: "B" and field2:"C" field1:"B" and field2:"D" field1:"B" and fielde:"E" I was thinking about using a watcher to reindex the already existing documents and creating the new field at the same time. But I'm not sure how to do this nor if this is the correct approach.


Solution

  • I managed to do this with a version of the following python script:

    import logging
    from elasticsearch import Elasticsearch, helpers
    import schedule
    import time
    import sys
    
    # Configure logging to save logs in parse.log file
    logging.basicConfig(level=logging.INFO, filename='parse.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
    logger = logging.getLogger(__name__)
    
    # Configure Elasticsearch connection
    es = Elasticsearch(hosts=["https://elasticsearch.com:111"], basic_auth=("elasticsearch_user", "elasticsearch_password"))
    
    # Define source and destination index names
    source_index = "source_index"
    dest_index = "dest_index"
    
    # Optional query to filter documents from source index
    query = {
        "size": 5000,  # Adjust the size based on your requirements
        "query": {
            "range": {
                "@timestamp": {
                    "gte": "now-1d/d",
                    "lte": "now/d"
                }
            }
        }
    }
    
    # Use the helpers.bulk API for efficient indexing
    def job():
        actions = []
        for hit in es.search(index=source_index, body=query)["hits"]["hits"]:
            # Modify document if needed before adding to actions
            source = hit["_source"]
    
            # Extract value of fields field1 and field2    
            field1 = source["field1"]
            field2 = source["field2"]
            
            # Extract specific terminals
            field1_string = str(field1)
            field2_string = str(field2)
            nodes = field2_string.split(",")
    
            # Define de id of the document, according to the number of terminals
            if len(nodes) == 0:
                source["node"] = ""
                actions.append({
                    "_index": dest_index,
                    "_id": hit["_id"],
                    "_source": source.copy()  # Create a copy of the source dictionary
                })
            else:
                for node in nodes:
                    new_id = field1_string + node
                    source_copy = source.copy()  # Create a copy of the source dictionary
                    source_copy['host'] = node
                    actions.append({
                        "_index": dest_index,
                        "_id": new_id,
                        "_source": source_copy
                    })
    
        # Send all the documents to the destination index using bulk API
        try:
            helpers.bulk(es, actions)
            logger.info(f"Bulk indexing successful. Processed {len(actions)} documents.")
        except Exception as e:
            logger.error(f"Error during bulk indexing: {e}")
    
    def main():
        # Schedule the job to run every day at 7 am
        schedule.every().day.at("07:00").do(job)
    
        try:
            # Run the scheduler in an infinite loop
            while True:
                schedule.run_pending()
                time.sleep(1)
        except KeyboardInterrupt:
            # Handle Keyboard Interrupt (Ctrl+C)
            logger.info("Received KeyboardInterrupt. Exiting gracefully.")
            sys.exit(0)
    
    if __name__ == "__main__":
        main()