Search code examples
elasticsearchkibana

Elasticsearch - Calculate Delay between Timestamps


How can I calculate the delay between timestamps without logstash, but with script_fields?

e.g. for this documents:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03"
 }
}

I want to have a new field called "time_taken", so the expected documents should look like this:

{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:00"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:01",
  "time_taken": "1"
 }
}
{
 "_source": {
  "name": "test1",
  "timestamp": "2021-12-30 12:30:03",
  "time_taken": "2"
 }
}

Solution

  • The provided answer was inspired from Painless example in Transforms.

    The solution uses Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.

    First thing I created a mapping for the provided example:

    PUT myindex
    {
      "mappings": {
        "properties": {
          "name": {
            "type": "text",
            "fields": {
              "keywords": {
                "type": "keyword"
              }
            }
          },
          "timestamp": {
            "type": "date"
          }
        }
      }
    }
    
    

    and insert some documents:

    POST myindex/_doc
    {
      "name": "test1",
      "timestamp":"2022-01-27T19:48:11Z"
    }
    POST myindex/_doc
    {
      "name": "test1",
      "timestamp":"2022-01-27T19:50:11Z"
    }
    POST myindex/_doc
    {
      "name": "test1",
      "timestamp":"2022-01-27T19:53:11Z"
    }
    POST myindex/_doc
    {
      "name": "test2",
      "timestamp":"2022-01-27T19:35:11Z"
    }
    POST myindex/_doc
    {
      "name": "test2",
      "timestamp":"2022-01-27T19:36:11Z"
    }
    

    Using the Transform API we can calculate for each aggregation the time length for each term:

    POST _transform/_preview
    {
      "source": {
        "index": "myindex"
      },
      "dest": {
        "index": "destindex"
      },
      "pivot": {
        "group_by": {
          "name": {
            "terms": {
              "field": "name.keywords"
            }
          }
        },
        "aggregations": {
          "latest_value": {
            "scripted_metric": {
              "init_script": "state.timestamp_latest = 0L;",
              "map_script": """
              def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
              if (current_date > state.timestamp_latest)
              {state.timestamp_latest = current_date;}
            """,
              "combine_script": "return state",
              "reduce_script": """
              def last_doc = '';
              def timestamp_latest = 0L;
              for (s in states) {if (s.timestamp_latest > (timestamp_latest))
              {timestamp_latest = s.timestamp_latest;}}
              return timestamp_latest
            """
            }
          },
          "first_value": {
             "scripted_metric": {
              "init_script": "state.timestamp_first = 999999999999999L;",
              "map_script": """
              def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
              if (current_date < state.timestamp_first)
              {state.timestamp_first = current_date;}
            """,
              "combine_script": "return state",
              "reduce_script": """
              def last_doc = '';
              def timestamp_first = 999999999999999L;
              for (s in states) {if (s.timestamp_first < (timestamp_first))
              {timestamp_first = s.timestamp_first;}}
              return timestamp_first
            """
            }
          },
          "time_length": {
            "bucket_script": {
              "buckets_path": {
                "min": "first_value.value",
                "max": "latest_value.value"
              },
              "script": "(params.max - params.min)/1000"
            }
          }
        }
      }
    }
    

    The output is as follow:

    {
      "preview" : [
        {
          "time_length" : 300.0,
          "name" : "test1",
          "first_value" : 1643312891000,
          "latest_value" : 1643313191000
        },
        {
          "time_length" : 60.0,
          "name" : "test2",
          "first_value" : 1643312111000,
          "latest_value" : 1643312171000
        }
      ],
      "generated_dest_index" : {
        "mappings" : {
          "_meta" : {
            "_transform" : {
              "transform" : "transform-preview",
              "version" : {
                "created" : "7.15.1"
              },
              "creation_date_in_millis" : 1643400080594
            },
            "created_by" : "transform"
          },
          "properties" : {
            "name" : {
              "type" : "keyword"
            }
          }
        },
        "settings" : {
          "index" : {
            "number_of_shards" : "1",
            "auto_expand_replicas" : "0-1"
          }
        },
        "aliases" : { }
      }
    }
    

    What's the script doing?

    As you can see we are creating a term aggregation on the field name.keywords. We used a scripted metric aggregation that has 4 steps:

    • init_script: initiate a state, it's a space where you initialize your variables and their scope are global for all shards
    • map_script: this step execute the code for each document, means you can iterate or do complex calculation on your documents like if you were coding in a high-level programming language like python or java (avoid doing heavy calculation or it will slower your aggregation)
    • combine_script: here we tell elasticsearch to return the state from each shard
    • reduce_script: it's the final step where we iterate over the result of each shard from the previous step (aka combine script) to calculate the first/latest timestamp for each aggregation.

    Finally, in the bucket script, we calculate the difference given the first_value and latest_value, we divided by 1000 because the timestamp field is stored in epoch millis. time_length unit is in seconds.

    More information about scripted metric aggregation: Scripted metrics.