How can I calculate the delay between timestamps without logstash, but with script_fields?
e.g. for this documents:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03"
}
}
I want to have a new field called "time_taken", so the expected documents should look like this:
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01",
"time_taken": "1"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03",
"time_taken": "2"
}
}
The provided answer was inspired from Painless example in Transforms.
The solution uses Transforms API and it has some limitations I recommend you to check them and see if it's fine for your use-case Transform limitation.
First thing I created a mapping for the provided example:
PUT myindex
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keywords": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}
and insert some documents:
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:36:11Z"
}
Using the Transform API we can calculate for each aggregation the time length for each term:
POST _transform/_preview
{
"source": {
"index": "myindex"
},
"dest": {
"index": "destindex"
},
"pivot": {
"group_by": {
"name": {
"terms": {
"field": "name.keywords"
}
}
},
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest;}}
return timestamp_latest
"""
}
},
"first_value": {
"scripted_metric": {
"init_script": "state.timestamp_first = 999999999999999L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date < state.timestamp_first)
{state.timestamp_first = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_first = 999999999999999L;
for (s in states) {if (s.timestamp_first < (timestamp_first))
{timestamp_first = s.timestamp_first;}}
return timestamp_first
"""
}
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "first_value.value",
"max": "latest_value.value"
},
"script": "(params.max - params.min)/1000"
}
}
}
}
}
The output is as follow:
{
"preview" : [
{
"time_length" : 300.0,
"name" : "test1",
"first_value" : 1643312891000,
"latest_value" : 1643313191000
},
{
"time_length" : 60.0,
"name" : "test2",
"first_value" : 1643312111000,
"latest_value" : 1643312171000
}
],
"generated_dest_index" : {
"mappings" : {
"_meta" : {
"_transform" : {
"transform" : "transform-preview",
"version" : {
"created" : "7.15.1"
},
"creation_date_in_millis" : 1643400080594
},
"created_by" : "transform"
},
"properties" : {
"name" : {
"type" : "keyword"
}
}
},
"settings" : {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
}
},
"aliases" : { }
}
}
As you can see we are creating a term aggregation on the field name.keywords
. We used a scripted metric aggregation that has 4 steps:
combine script
) to calculate the first/latest timestamp for each aggregation.Finally, in the bucket script
, we calculate the difference given the first_value
and latest_value
, we divided by 1000
because the timestamp field is stored in epoch millis. time_length
unit is in seconds.
More information about scripted metric aggregation: Scripted metrics.