What would be the best way to query Elasticsearch in order to implement a date histogram representing the total number of unique visitors metric?
Considering the following data:
PUT /events
{
"mappings" : {
"_doc" : {
"properties" : {
"userId" : { "type" : "keyword" },
"eventDate" : { "type" : "date" }
}
}
}
}
POST /events/_bulk
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "1" } }
{"userId": "1","eventDate": "2019-03-04T13:40:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "2" } }
{"userId": "2","eventDate": "2019-03-04T13:46:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "3" } }
{"userId": "3","eventDate": "2019-03-04T13:50:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "4" } }
{"userId": "1","eventDate": "2019-03-05T13:46:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "5" } }
{"userId": "4","eventDate": "2019-03-05T13:46:18.514Z"}
Now, if I query the cardinality of the userId field I get the 4 distinct visitors.
POST /events/_search
{
"size": 0,
"aggs": {
"visitors": {
"cardinality": {
"field": "userId"
}
}
}
}
However, distributing the documents over a date histogram, I get a total sum of 5 because there's a repeated userId in both buckets.
POST /events/_search
{
"size": 0,
"aggs": {
"visits_over_time": {
"date_histogram": {
"field": "eventDate",
"interval": "1d"
},
"aggs": {
"visitors": {
"cardinality": {
"field": "userId"
}
}
}
}
}
}
Is there a way to filter out those repeated values? What would be the best way to accomplish this?
Even though I would like to avoid scripts, Scripted Metric Aggregation seems to be the only way to accomplish what was requested:
{
"size": 0,
"aggs": {
"visitors": {
"scripted_metric": {
"init_script": "params._agg.dateMap = new HashMap();",
"map_script": "params._agg.dateMap.merge(doc.userId[0].toString(), doc.eventDate.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);",
"combine_script": "return params._agg.dateMap;",
"reduce_script": "def dateMap = new HashMap(); for (map in params._aggs) { if (map == null) continue; for (entry in map.entrySet()) dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2); } def hist = new TreeMap(); for (entry in dateMap.entrySet()) hist.merge(entry.value.toString(), 1, (a, b) -> a + 1); return hist;"
}
}
}
}
Init just creates an empty HashMap, Map fills that map with userId as the key and sets the oldest eventDate as the value, and Combine just unwraps the map to be passed to Reduce:
def dateMap = new HashMap();
for (map in params._aggs) {
if (map == null) continue;
for (entry in map.entrySet())
dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);
}
def hist = new TreeMap();
for (entry in dateMap.entrySet())
hist.merge(entry.value.toString(), 1, (a, b) -> a + 1);
return hist;
Up to Combine the code was executed for each cluster node, Reduce merges all maps into one (i.e. dateMap) preserving the oldest eventDate per userId. Then it counts the occurrences of each eventDate.
The result is:
"aggregations": {
"visitors": {
"value": {
"2019-03-04T13:40:18.514Z": 1,
"2019-03-04T13:46:18.514Z": 1,
"2019-03-04T13:50:18.514Z": 1,
"2019-03-05T13:46:18.514Z": 1
}
}
}
The only missing part is that those values have to be grouped into a histogram on application code.
Note¹: Use at your own risk, I don't know if memory consumption increases much because of those hash maps or how well it performs on large datasets.
Note²: starting from Elasticsearch 6.4 state
and states
should be used instead of params._agg
and params._aggs
.