Search code examples
elasticsearchaggregateelasticsearch-plugin

how to compare two aggregations in elastic search


I have a stream of transaction data, which I'm grouping my 10m interval and counting the number of transactions in one aggregation, and moving average in another. I would like to query the results only for the case where total_count is > moving average.

This query returns just fine.

GET /_search

{ 
  "aggs": {
        "my_date_histo":{                
            "date_histogram":{
                "field":"created_at",
                "interval":"10m"
            },

            "aggs":{
                "the_count":{
                    "value_count" : {"field" : "user_id"}
                },

              "the_movavg":{
                  "moving_avg":{ 
                    "buckets_path": "the_count" ,
                    "window": 5,
                    "model": "simple"
                  }
              }
      }
    }
  }
}

But when I try the following it throws error,

GET /_search
{ 
  "aggs": {
        "my_date_histo":{                
            "date_histogram":{
                "field":"created_at",
                "interval":"10m"
            },

            "aggs":{
                "the_count":{
                    "value_count" : {"field" : "user_id"}
                },

              "the_movavg":{
                  "moving_avg":{ 
                    "buckets_path": "the_count" ,
                    "window": 5,
                    "model": "simple"
                  }
              },

                "final_filter": {
           "bucket_selector": {
          "buckets_path": {
            "TheCount": "the_count",
            "TheMovAvg": "the_movavg"

          },
          "script": "params.TheCount > params.TheMovAvg"
      }
  }

      }
    }
  }

}

EDIT :

Mapping

{
  "transaction-live": {
    "mappings": {
      "logs": {
        "properties": {
          "@timestamp": {
            "type": "date"
          },
          "@version": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "correspondent_id": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "created_at": {
            "type": "date"
          },
          "discount": {
            "type": "float"
          },
          "endpoint": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "event_type": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "fees": {
            "type": "float"
          },
          "from_country_code": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "from_currency_code": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "fx_sent_receive": {
            "type": "float"
          },
          "receive_amount": {
            "type": "float"
          },
          "response_code": {
            "type": "long"
          },
          "send_amount": {
            "type": "float"
          },
          "source": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "source_version": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "startedtransaction_id": {
            "type": "long"
          },
          "to_country_code": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "user_agent": {
            "type": "text",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "user_id": {
            "type": "long"
          }
        }
      }
    }
  }
}

ERROR:

{
  "error": {
    "root_cause": [],
    "type": "reduce_search_phase_exception",
    "reason": "[reduce] ",
    "phase": "fetch",
    "grouped": true,
    "failed_shards": [],
    "caused_by": {
      "type": "script_exception",
      "reason": "runtime error",
      "caused_by": {
        "type": "null_pointer_exception",
        "reason": null
      },
      "script_stack": [
        "params.TheCount > params.TheMovAvg",
        "                        ^---- HERE"
      ],
      "script": "params.TheCount > params.TheMovAvg",
      "lang": "painless"
    }
  },
  "status": 503
}

Solution

  • I played around with your query a bit and found the issue. Following is the working query you can use

    {
        "size": 0,
        "aggs": {
            "my_date_histo": {
                "date_histogram": {
                    "field": "created_at",
                    "interval": "10m"
                },
                "aggs": {
                    "the_count": {
                        "value_count": {
                            "field": "user_id"
                        }
                    },
                    "the_movavg": {
                        "moving_avg": {
                            "buckets_path": "the_count",
                            "window": 5,
                            "model": "simple"
                        }
                    },
                    "final_filter": {
                        "bucket_selector": {
                            "buckets_path": {
                                "TheCount": "the_count",
                                "TheMovAvg": "the_movavg"
    
                            },
                            "script": "params.TheCount > (params.TheMovAvg == null ? 0 : params.TheMovAvg)"
                        }
                    }
                }
            }
        }
    }
    

    Now to understand the issue, take the look at the following result of aggregation without the bucket_selector aggregation.

    {
      "took": 10,
      "timed_out": false,
      "_shards": {
        "total": 5,
        "successful": 5,
        "failed": 0
      },
      "hits": {
        "total": 42,
        "max_score": 0,
        "hits": []
      },
      "aggregations": {
        "my_date_histo": {
          "buckets": [
            {
              "key_as_string": "2017-03-06T15:30:00.000Z",
              "key": 1488814200000,
              "doc_count": 14,
              "the_count": {
                "value": 14
              }
            },
            {
              "key_as_string": "2017-03-06T15:40:00.000Z",
              "key": 1488814800000,
              "doc_count": 0,
              "the_count": {
                "value": 0
              }
            },
            {
              "key_as_string": "2017-03-06T15:50:00.000Z",
              "key": 1488815400000,
              "doc_count": 14,
              "the_count": {
                "value": 14
              },
              "the_movavg": {
                "value": 7
              }
            },
            {
              "key_as_string": "2017-03-06T16:00:00.000Z",
              "key": 1488816000000,
              "doc_count": 3,
              "the_count": {
                "value": 3
              },
              "the_movavg": {
                "value": 14
              }
            },
            {
              "key_as_string": "2017-03-06T16:10:00.000Z",
              "key": 1488816600000,
              "doc_count": 8,
              "the_count": {
                "value": 7
              },
              "the_movavg": {
                "value": 8.5
              }
            },
            {
              "key_as_string": "2017-03-06T16:20:00.000Z",
              "key": 1488817200000,
              "doc_count": 3,
              "the_count": {
                "value": 3
              },
              "the_movavg": {
                "value": 6.375
              }
            }
          ]
        }
      }
    }
    

    if you observe the result above the first two buckets don't compute the moving_aggs for that window/setting for moving_agg. So when your filter selector was comparing it was throwing null pointer exception on runtime as JAVA compare operator throws null pointer exception.

    Hope this helps you. Thanks