Search code examples
elasticsearchelasticsearch-aggregationelasticsearch-queryelasticsearch-py

ElasticSearch: Avg aggregation for datetime format


I am stuck regarding an elastic search query using python

I have data such as:

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "gdUJpXIBAoADuwvHTK29",
  "_score": 1,
  "_source": {
    "user_name": "[email protected]",
    "working_hours": "2019-10-21 09:00:01",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "gtUJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "[email protected]",
    "working_hours": "2019-10-21 09:15:01",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "g9UJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "[email protected]",
    "working_hours": "2019-10-22 07:50:00",
}

{
  "_index": "user_log",
  "_type": "logs",
  "_id": "g8UJpXIBAoADuwvHTK29",
  "_version": 1,
  "_score": 0,
  "_source": {
    "user_name": "[email protected]",
    "working_hours": "2019-10-22 04:15:01",
}

Here, for each user give working hours for different date(21 and 22). I want to take an average of each user's working hours.

{
    "size": 0,
    "query" : {"match_all": {}},
     "aggs": {
      "users": {
          "terms": {
              "field": "user_name"
          },
          "aggs": {
              "avg_hours": {
                  "avg": {
                      "field": "working_hours"
                  }
              }
          }
      }
  }
}

This query not working. How to find the average working hours for each user for all dates? And, I also want to run this query using python-elastic search.

Updated When I use ingest pipeline as @Val mention. I am getting an error:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "script_exception",
        "reason" : "compile error",
        "processor_type" : "script",
        "script_stack" : [
          "\n        def workDate = /\\s+/.split(ctx.working_h ...",
          "                        ^---- HERE"
        ],
        "script" : "\n        def workDate = /\\s+/.split(ctx.working_hours);\n        def workHours = /:/.split(workDate[1]);\n        ctx.working_minutes = (Integer.parseInt(workHours[0]) * 60) + Integer.parseInt(workHours[1]);\n        ",
        "lang" : "painless",
        "position" : {
          "offset" : 24,
          "start" : 0,
          "end" : 49
        }
      }
.....

How can I solve it?


Solution

  • The problem is that your working_hours field is a point in time and does not denote a duration.

    For this use case, it's best to store the working day and working hours in two separate fields and store the working hours in minutes.

    So instead of having documents like this:

    {
        "user_name": "[email protected]",
        "working_hours": "2019-10-21 09:00:01",
    }
    

    Create documents like this:

    {
        "user_name": "[email protected]",
        "working_day": "2019-10-21",
        "working_hours": "09:00:01",
        "working_minutes": 540
    }
    

    Then you can use your query on the working_minutes field:

    {
        "size": 0,
        "query" : {"match_all": {}},
         "aggs": {
          "users": {
              "terms": {
                  "field": "user_name.keyword",
                  "order": {
                     "avg_hours": "desc"
                  }
              },
              "aggs": {
                  "avg_hours": {
                      "avg": {
                          "field": "working_minutes"
                      }
                  }
              }
          }
      }
    }
    

    If it is not convenient to compute the working_minutes field in your client code, you can achieve the same thing using an ingest pipeline. Let's define the pipeline first:

    PUT _ingest/pipeline/working-hours
    {
      "processors": [
        {
          "dissect": {
            "field": "working_hours",
            "pattern": "%{?date} %{tmp_hours}:%{tmp_minutes}:%{?seconds}"
          }
        },
        {
          "convert": {
            "field": "tmp_hours",
            "type": "integer"
          }
        },
        {
          "convert": {
            "field": "tmp_minutes",
            "type": "integer"
          }
        },
        {
          "script": {
            "source": """
            ctx.working_minutes = (ctx.tmp_hours * 60) + ctx.tmp_minutes;
            """
          }
        },
        {
          "remove": {
            "field": [
              "tmp_hours",
              "tmp_minutes"
            ]
          }
        }
      ]
    }
    

    Then you need to update your Python client code to use the new pipeline that will create the working_hours field for you:

    helpers.bulk(es, reader, index='user_log', doc_type='logs', pipeline='working-hours')