Search code examples
elasticsearch

Missing doc in aggregated results and wrong count of items


I have such data in ES:

{"action":"B1", "status":"E", "name": "abc", "version":0, "colour": "red"}
{"action":"B1", "status":"E", "name": "def", "version":1, "colour": "red"}
{"action":"B1", "status":"E", "name": "was", "version":2, "colour": "red"}
{"action":"B2", "status":"V", "name": "acc", "version":0, "colour": "red"}
{"action":"B3", "status":"E", "name": "fff", "version":0, "colour": "red"}
{"action":"B3", "status":"V", "name": "ttt", "version":1, "colour": "red"}
{"action":"B4", "status":"V", "name": "ttt", "version":1, "colour": "blue"}

The requirements are:

  • Take only items with colour = red

And for all red items I need to do some grouping by action name and for each action name group:

  • if there are multiple actions in is in E state, take the item with latest version and add it to results
  • if there are actions is in V state, add all of them to results

Correct results would be (sorted by name ASC):

 {"action":"B2", "status":"V", "name": "acc", "version"=0} //this was returned because all actions with status V should be returned
 {"action":"B3", "status":"E", "name": "fff", "version"=0} //there were two actions in B3 group, it returned the one with highest version and E state
 {"action":"B3", "status":"V", "name": "ttt", "version"=1} //there were two actions in B3 group, it returned the one with V state, because they should be always added to results
 {"action":"B1", "status":"E", "name": "was", "version"=2} //there were many actions in B1 group so it returned the one with highest version

I want to search documents. And my query looks like this:

 GET  myIndex/_search
 {   
  "from": 0,
  "size": 10,
  "query": {
    "bool": {
      "filter": [
        {
          "query_string": {
            "query": "red",
            "fields": [
              "colour"
            ]
          }
        }
      ],
      "adjust_pure_negative": true,
      "boost": 1.0
    }
  } ,
  "version": true,
  "explain": false,
  
  "aggregations": {
    "all_items": {
      "terms": {
        "field": "action",
        "size": 10, //I AM NOT SURE WHAT THIS SIZE SPECIFIES ? I want first page of 10 elements
        "min_doc_count": 1,
        "shard_min_doc_count": 0,
        "show_term_doc_count_error": false,
        "order": [
          {
            "_count": "desc"
          },
          {
            "_key": "asc"
          }
        ]
      },
      "aggregations": {
        "bbb": {
          "top_hits": {
            "from": 0,
            "size": 1,
            "version": false,
            "seq_no_primary_term": false,
            "explain": false,
            "fields": [
              {
                "field": "version"
              }
            ]
          }
        }
      }
    }
  }
} 

My current results are wrong. Problems:

1)

"hits" : {
    "total" : {
      "value" : 6,
      "relation" : "eq"
    } 

hits.hits the totalHits=6 is greater than number of documents in buckets (3) and I cannot construct paging information (in my pager I should see total number of elements which should be 4)

{"action":"B1", "status":"E", "name": "abc", "version"=0, "colour" = red}
{"action":"B1", "status":"E", "name": "def", "version"=1, "colour" = red}
{"action":"B1", "status":"E", "name": "was", "version"=2, "colour" = red}
{"action":"B2", "status":"V", "name": "acc", "version"=0, "colour" = red}
{"action":"B3", "status":"E", "name": "fff", "version"=0, "colour" = red}
{"action":"B3", "status":"V", "name": "ttt", "version"=1, "colour" = red}

2) In result buckets I have documents grouped by statuses:

bucket with key B1 //OK

  • hits.total.value=3
  • hits: {"action":"B1", "status":"E", "name": "was", "version"=2, "colour" = red}

bucket with key B2 //OK

  • hits.total.value=1
  • hits: {"action":"B2", "status":"V", "name": "acc", "version"=0, "colour" = red}

bucket with key B3 //WRONG

  • hits.total.value=2
  • hits: {"action":"B3", "status":"V", "name": "ttt", "version"=1, "colour" = red}

So here I am missing this document: {"action":"B3", "status":"E", "name": "fff", "version"=0, "colour" = red} because aggregation took the document with highest version from this bucket. But according to requirements I should take all elements with V status and the last item (in terms of version) with E status.

Any help would be appreciated.

As for loosing elements with status V I have some idea, for example to introduce new field for grouping (so that I would not group by action field but by newGroupingDiscriminatorField) and assign to such field respect value:

  • when status is E the value is action name, like B1
  • when status is V the value would be random uuid, so they each such item would be in results in separate bucket and would not be lost. Is it good idea or there is something that ES offers and could be used ?

I don't have idea for correct totalCount (i should show to the user that he is watching 1-10 element from 120 of them -> 120 means 120 elements ready to display, already grouped with eliminated unneccessary items, not before grouping)

I don't know which part of the query would be responsible for getting only first page from all grouped elelements. First page has 10 elements. But to get first page only I need to group all documents first right.. ?


Solution

  • Answer #1. Document Extraction

    You can solve it by scripted_metric

    Your documents

    PUT /complex_selection/_bulk
    {"create":{"_id":1}}
    {"action":"B1", "status":"E", "name": "was", "version": 2, "colour": "red"}
    {"create":{"_id":2}}
    {"action":"B1", "status":"E", "name": "abc", "version": 0, "colour": "red"}
    {"create":{"_id":3}}
    {"action":"B1", "status":"E", "name": "def", "version": 1, "colour": "red"}
    {"create":{"_id":4}}
    {"action":"B2", "status":"V", "name": "acc", "version": 0, "colour": "red"}
    {"create":{"_id":5}}
    {"action":"B3", "status":"E", "name": "fff", "version": 0, "colour": "red"}
    {"create":{"_id":6}}
    {"action":"B3", "status":"V", "name": "ttt", "version": 1, "colour": "red"}
    {"create":{"_id":7}}
    {"action":"B4", "status":"V", "name": "ttt", "version": 1, "colour": "blue"}
    

    Scripted metric query

    POST /complex_selection/_search?filter_path=aggregations
    {
        "query": {
            "term": {
                "colour": {
                    "value": "red"
                }
            }
        },
        "aggs": {
            "selected_documents": {
                "scripted_metric": {
                    "init_script": "state.actions = [:];",
                    "map_script": """
                        def action = params['_source']['action'];
                        def status = params['_source']['status'];
                        def document = params['_source'];
                        
                        if (state.actions[action] == null) {
                            state.actions[action] = [:];
                        }
    
                        if (state.actions[action][status] == null) {
                                state.actions[action][status] = [];
                        }
    
                        if (status == 'E') {
                            if (state.actions[action][status].size() > 0) {
                                def mapDocumentVersion = state.actions[action][status][0]['version'];
                                def documentVersion = document['version'];
                                if (mapDocumentVersion >= documentVersion) {
                                    return;
                                }
                            }
                            state.actions[action][status] = [];
                        }
                        
                        state.actions[action][status].add(document);
                    """,
                    "combine_script": "return state.actions",
                    "reduce_script": """
                        Map selectedDocuments = new HashMap();
                        for (state in states) {
                            selectedDocuments.putAll(state);
                        }
                        
                        List documents = [];
                        Set mapActions = selectedDocuments.keySet();
                        for (mapAction in mapActions) {
                            Set mapStatuses = selectedDocuments[mapAction].keySet();
                            for (mapStatus in mapStatuses) {
                                List value = selectedDocuments[mapAction][mapStatus];
                                documents.addAll(value);
                            }
                        }
                        
                        return documents;
                    """
                }
            }
        }
    }
    

    Response

    {
        "aggregations" : {
            "selected_documents" : {
                "value" : [
                    {
                        "colour" : "red",
                        "name" : "acc",
                        "action" : "B2",
                        "version" : 0,
                        "status" : "V"
                    },
                    {
                        "colour" : "red",
                        "name" : "fff",
                        "action" : "B3",
                        "version" : 0,
                        "status" : "E"
                    },
                    {
                        "colour" : "red",
                        "name" : "ttt",
                        "action" : "B3",
                        "version" : 1,
                        "status" : "V"
                    },
                    {
                        "colour" : "red",
                        "name" : "was",
                        "action" : "B1",
                        "version" : 2,
                        "status" : "E"
                    }
                ]
            }
        }
    }
    

    NOTE! This query is for one-shard indices. The reduce_script stage code is simplified and not correct for many-shard indices