I have an index that contains 1 document. This document has a field1 with value "B" and a field2 with value "C, D, E" (that is, the value in field2 is comma separated and can have variable lenght). I want to create a new index, that contains the following 3 documents: field1: "B" and field2:"C" field1:"B" and field2:"D" field1:"B" and fielde:"E" I was thinking about using a watcher to reindex the already existing documents and creating the new field at the same time. But I'm not sure how to do this nor if this is the correct approach.
I managed to do this with a version of the following python script:
import logging
from elasticsearch import Elasticsearch, helpers
import schedule
import time
import sys
# Configure logging to save logs in parse.log file
logging.basicConfig(level=logging.INFO, filename='parse.log', filemode='a', format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configure Elasticsearch connection
es = Elasticsearch(hosts=["https://elasticsearch.com:111"], basic_auth=("elasticsearch_user", "elasticsearch_password"))
# Define source and destination index names
source_index = "source_index"
dest_index = "dest_index"
# Optional query to filter documents from source index
query = {
"size": 5000, # Adjust the size based on your requirements
"query": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
}
# Use the helpers.bulk API for efficient indexing
def job():
actions = []
for hit in es.search(index=source_index, body=query)["hits"]["hits"]:
# Modify document if needed before adding to actions
source = hit["_source"]
# Extract value of fields field1 and field2
field1 = source["field1"]
field2 = source["field2"]
# Extract specific terminals
field1_string = str(field1)
field2_string = str(field2)
nodes = field2_string.split(",")
# Define de id of the document, according to the number of terminals
if len(nodes) == 0:
source["node"] = ""
actions.append({
"_index": dest_index,
"_id": hit["_id"],
"_source": source.copy() # Create a copy of the source dictionary
})
else:
for node in nodes:
new_id = field1_string + node
source_copy = source.copy() # Create a copy of the source dictionary
source_copy['host'] = node
actions.append({
"_index": dest_index,
"_id": new_id,
"_source": source_copy
})
# Send all the documents to the destination index using bulk API
try:
helpers.bulk(es, actions)
logger.info(f"Bulk indexing successful. Processed {len(actions)} documents.")
except Exception as e:
logger.error(f"Error during bulk indexing: {e}")
def main():
# Schedule the job to run every day at 7 am
schedule.every().day.at("07:00").do(job)
try:
# Run the scheduler in an infinite loop
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
# Handle Keyboard Interrupt (Ctrl+C)
logger.info("Received KeyboardInterrupt. Exiting gracefully.")
sys.exit(0)
if __name__ == "__main__":
main()