Search code examples
elasticsearch

ElasticSearch how to query longest task


I have the data in the following format in Elastic Search:

POST slots/slot/1
{
    taskId:1,
    datetime: "2020-05-10T08:45:44",
    status: "START",
}

POST slots/slot/2
{
    taskId:1,
    datetime: "2020-05-10T08:49:54",
    status: "STOP",
}
...

and want to find a way how to retrieve top 3 longest running tasks (which means tasks, for which exists both START and STOP json objects and differrence between it's START/STOP times is the longest one) - I want to retrieve taskId and runningTime (= how long task was running).

Is it possible to achieve this task in ElasticSearch ? Is ElasticSearch appropriate for such kind of tasks ?

Please be lenient, I am really new to ElasticSearch technology.


Solution

  • This one's tricky. Let's assume that you'll have precisely 2 docs for each unique taskId, one of which will be START and the other STOP. In that case we can do the following:

    GET slots/_search
    {
      "size": 0,
      "aggs": {
        "by_ids": {
          "terms": {
            "field": "taskId",
            "size": 10000,
            "min_doc_count": 2
          },
          "aggs": {
            "start_bucket": {
              "filter": {
                "term": {
                  "status.keyword": "START"
                }
              },
              "aggs": {
                "datetime_term": {
                  "max": {
                    "field": "datetime"
                  }
                }
              }
            },
            "stop_bucket": {
              "filter": {
                "term": {
                  "status.keyword": "STOP"
                }
              },
              "aggs": {
                "datetime_term": {
                  "max": {
                    "field": "datetime"
                  }
                }
              }
            },
            "diff_in_millis": {
              "bucket_script": {
                "buckets_path": {
                  "start": "start_bucket.datetime_term",
                  "stop": "stop_bucket.datetime_term"
                },
                "script": "return params.stop - params.start"
              }
            },
            "final_sort": {
              "bucket_sort": {
                "sort": [
                  {
                    "diff_in_millis": {
                      "order": "desc"
                    }
                  }
                ],
                "size": 3
              }
            }
          }
        }
      }
    }
    

    As per this discussion,

    the caveat is that this performs sorting on the final list of buckets. So if a term isn't in the list, it won't get sorted. That's in contrast to sorting on the terms agg itself, which changes the contents of the list.

    In other words we need to set the top-level size arbitrarily high so that all our taskIDs get aggregated. And/or pre-filter the context with, say, a date filter of only the year 2020 or the last month etc so we've got less ground to cover and save us some CPU crunch time.

    If everything goes right and your status has a .keyword field (more on this here) we can filter on, you'll end up with all the information you need:

    {
      ...
      "aggregations":{
        "by_ids":{
          "doc_count_error_upper_bound":0,
          "sum_other_doc_count":0,
          "buckets":[
            {
              "key":2,            <-- taskID (this one was added by myself)
              "doc_count":2,
              "start_bucket":{
                ...
              },
              "stop_bucket":{
                ...
              },
              "diff_in_millis":{
                "value":3850000.0        <-- duration in millis
              }
            },
            {
              "key":1,                  <-- task from the question
              "doc_count":2,
              "start_bucket":{
                ...
              },
              "stop_bucket":{
               ...
              },
              "diff_in_millis":{
                "value":250000.0        <-- duration in millis
              }
            }
          ]
        }
      }
    }
    

    EDIT/CORRECTION:

    "min_doc_count": 2 is needed b/c we're only interested in tasks that actually finished. If you want to include those that have been running and are not finished yet, create another bounty task ;)