Search code examples
elasticsearchelasticsearch-aggregationelasticsearch-dsl

Elasticsearch aggregation for Group By then get Avg of field for Max date


I'm trying to build a query in Elasticsearch which will do:
a) Group By a field (that is department_name)
b) Get the documents with the max date(that is record_date)
c) Calculate the average of the field of the remaining documents(that is risk_index_value).

I've managed to build the query below in case my description isn't that helpful:

{
   "size":0,
       "query" : {
        "match": {
          "record_date": "2021-04-08"
        }
    },
   "aggs":{
      "assets":{
         "terms":{
            "field":"department_name",
            "size":10000
         },
         "aggs":{
            "risk_avg":{
               "avg":{
                  "field":"risk_index_value"
               }
            }
         }
      }
   }
}

This query does exactly what I want to in terms of business logic, but I need somehow to always get the max date without giving a value for it. Is there a way to do that? I need to do this with the REST High Level Elastic Client, but even the raw query would be really helpful. Thanks in advance!

EDIT: I will add some document examples so that my request makes more sense.

So imagine we have 11 documents:

department_name: A
risk_index_value: 10
record_date: 2021-04-28

department_name: A
risk_index_value: 30
record_date: 2021-04-28

department_name: A
risk_index_value: 20
record_date: 2021-04-28

department_name: A
risk_index_value: 100
record_date: 2021-04-20

department_name: A
risk_index_value: 80
record_date: 2021-04-20

department_name: B
risk_index_value: 240
record_date: 2021-04-28

department_name: B
risk_index_value: 220
record_date: 2021-04-28

department_name: B
risk_index_value: 200
record_date: 2021-04-28

department_name: B
risk_index_value: 100
record_date: 2021-04-20

department_name: B
risk_index_value: 90
record_date: 2021-04-20

department_name: C
risk_index_value: 45
record_date: 2021-04-28

So on the data below, the query I need would return something like:

department: A
risk_index_avg: 30
record_date: 2021-04-28

department: B
risk_index_avg: 220
record_date: 2021-04-28

department: C
risk_index_avg: 45
record_date: 2021-04-28

Hope this helps.


Solution

  • As I understood from your question, you want the average risk index of the latest record date in each department.

    There is way to find the max value using the terms aggregation. i.e;
    1. Use terms aggregation on required field
    2. Sort the term keys in descending order "order": { "_key": "desc" }
    3. Get only one top value by saying size = 1. (which will be the max value)
    "aggs": {
       "maxKey": {
          "terms": {
             "field": "<field whose max is required>",
             "size": 1,
             "order": {
                "_key": "desc"
              }
           }
       }
    }
    
    

    I think, Below is the query you are looking for.

    {
      "size": 0,
      "aggs": {
        "EachDepartment": {
          "terms": {
            "field": "department_name",
            "size": 1000
          },
          "aggs": {
            "MaxRecordDate": {
              "terms": {
                "field": "record_date",
                "size": 1,
                "order": {
                  "_key": "desc"
                }
              },
              "aggs": {
                "AvgOfRiskIndex": {
                  "avg": {
                    "field": "risk_index_value"
                  }
                }
              }
            }
          }
        }
      }
    }
    
    

    And I tried to execute this with the sample data provided by you and got the below response.

    {
      "aggregations" : {
        "EachDepartment" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "A",
              "doc_count" : 5,
              "MaxRecordDate" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 2,
                "buckets" : [
                  {
                    "key" : 1619568000000,
                    "key_as_string" : "2021-04-28 00:00:00",
                    "doc_count" : 3,
                    "AvgOfRiskIndex" : {
                      "value" : 20.0
                    }
                  }
                ]
              }
            },
            {
              "key" : "B",
              "doc_count" : 5,
              "MaxRecordDate" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 2,
                "buckets" : [
                  {
                    "key" : 1619568000000,
                    "key_as_string" : "2021-04-28 00:00:00",
                    "doc_count" : 3,
                    "AvgOfRiskIndex" : {
                      "value" : 220.0
                    }
                  }
                ]
              }
            },
            {
              "key" : "C",
              "doc_count" : 1,
              "MaxRecordDate" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 0,
                "buckets" : [
                  {
                    "key" : 1619568000000,
                    "key_as_string" : "2021-04-28 00:00:00",
                    "doc_count" : 1,
                    "AvgOfRiskIndex" : {
                      "value" : 45.0
                    }
                  }
                ]
              }
            }
          ]
        }
      }
    }
    
    

    I hope this answered your question.

    EDIT: Added RestHighLevelClient code to create the aggregation

    AggregationBuilder getAggsBuilder() {
        AggregationBuilder departmentAggs = AggregationBuilders.terms("eachDepartments")
                    .field("department_name")
                    .size(1000);
        AggregationBuilder maxRecordDateAgg = AggregationBuilders.terms("maxRecordDate")
                    .field("record_date")
                    .size(1)
                    .order(BucketOrder.key(false));
        AggregationBuilder avgRiskIndexAgg = AggregationBuilders.avg("avgRiskIndex")
                    .field("risk_index_value");
    
    // add avgRiskIndexAgg to maxRecordDate
        maxRecordDateAgg.subAggregation(avgRiskIndexAgg);
    
    //add maxRecordDate to departmentAggs
        departmentAggs.subAggregation(maxRecordDateAgg);
        return departmentAggs;
    }