PUT _enrich/policy/merge
{
"match": {
"indices": "es.event-154.23-09-01-6-27-22",
"match_field": "ancestor_id",
"enrich_fields": ["status","status_id","type","sub_type","primary_assigned","vehicle_of_interest","dms_ro_number","dms_deal_number","primary_assigned_user_name","dealership_id", "ancestor_id","event_id","deleted","service_date","update_date","secondary_assigned_user_name","bdc_assigned_user_name"]
}
}
PUT _ingest/pipeline/enrich
{
"processors": [
{
"enrich": {
"description": "Add Events to customer",
"policy_name": "merge",
"field": "contact_id",
"target_field": "events",
"max_matches": "1"
}
}
]
}
POST _reindex?wait_for_completion=false
{
"source": {
"index": "es.customer-154.24-08-09-6-27-22"
},
"dest": {
"index": "es.customerevent1-154.25-08-09-6-27-22",
"pipeline": "enrich"
}
}
Here, I am trying to add some values to the indices. If it is more matches there then which record should it take? . It looks like it took a random record by match field. Is possible to mention to match with specific order asc/desc by a field? Please help me to fix this.
Great progress so far!!
One way to achieve this is to store all matches in an array (here 10 matches stored in tmpArray
and then pick the one that suits you best using a script
processor (here tmpArray
is sorted by someField
and we store the first one in the event
field), like this:
PUT _ingest/pipeline/enrich
{
"processors": [
{
"enrich": {
"description": "Add Events to customer",
"policy_name": "merge",
"field": "contact_id",
"target_field": "tmpArray",
"max_matches": "10"
}
},
{
"script": {
"if": "ctx.tmpArray != null",
"source": """
Integer sortByEventId(Map o1, Map o2) {
return o1.event_id != null && o2?.event_id != null ? o2.event_id.compareTo(o1.event_id) : 0;
}
ctx.events = ctx.tmpArray.stream()
.sorted(this::sortByEventId)
.limit(5)
.collect(Collectors.toList());
ctx.remove('tmpArray');
"""
}
}
]
}