Search code examples
elasticsearchkibanaamazon-quicksight

QuickSight or Elasticsearch - Column wise aggregration


Is this possible to do in QuickSight or Elasticsearch? I have tried calculated fields in QuickSight and runtime scripts in Elasticsearch, not sure how to do it? Also, is what I'm not what I'm expecting is even possible in this tool.

Trying out a simple date difference between columns based on their action, here... "Time taken for 'creating a post' after a user registered"

Data Input:

enter image description here

Data output

enter image description here


Solution

  • It is possible using scripted metric aggregation

    Data

    "hits" : [
          {
            "_index" : "index121",
            "_type" : "_doc",
            "_id" : "aqJ3HnoBF6_U07qsNY-s",
            "_score" : 1.0,
            "_source" : {
              "user" : "Jen",
              "activity" : "Logged In",
              "activity_Time" : "2020-01-08"
            }
          },
          {
            "_index" : "index121",
            "_type" : "_doc",
            "_id" : "a6J3HnoBF6_U07qsXY_8",
            "_score" : 1.0,
            "_source" : {
              "user" : "Jen",
              "activity" : "Created a post",
              "activity_Time" : "2020-05-08"
            }
          },
          {
            "_index" : "index121",
            "_type" : "_doc",
            "_id" : "bKJ3HnoBF6_U07qsk4-0",
            "_score" : 1.0,
            "_source" : {
              "user" : "Mark",
              "activity" : "Logged In",
              "activity_Time" : "2020-01-03"
            }
          },
          {
            "_index" : "index121",
            "_type" : "_doc",
            "_id" : "baJ3HnoBF6_U07qsu48g",
            "_score" : 1.0,
            "_source" : {
              "user" : "Mark",
              "activity" : "Created a post",
              "activity_Time" : "2020-01-08"
            }
          }
        ]
    

    Query

    {
      "size": 0,
      "aggs": {
        "user": {
          "terms": {
            "field": "user.keyword",
            "size": 10000
          },
          "aggs": {
            "distinct_sum_feedback": {
              "scripted_metric": {
                "init_script": "state.docs = []",
                "map_script": """ Map span = [
                                               'timestamp':doc['activity_Time'],
                                               'activity':doc['activity.keyword'].value
                                             ];
                                  state.docs.add(span)
                              """,
                "combine_script": "return state.docs;",
                "reduce_script": """
                                      def all_docs = [];
                                      for (s in states) 
                                      {
                                          for (span in s) {
                                          all_docs.add(span);
                                        }
                                      }
                                      all_docs.sort((HashMap o1, HashMap o2)->o1['timestamp'].getValue().toInstant().toEpochMilli().compareTo(o2['timestamp'].getValue().toInstant().toEpochMilli()));
                                      
                                      Hashtable result= new Hashtable();
                                      boolean found = false;
                                      JodaCompatibleZonedDateTime loggedIn;
                                      for (s in all_docs) 
                                      {
                                          if(s.activity =='Logged In')
                                          {
                                            loggedIn=s.timestamp.getValue();
                                            found= true;
                                          }
                                          
                                          if(s.activity =='Created a post' && found==true)
                                          {
                                             found=false;
                                             
                                             def dt=loggedIn.getYear()+ '-' + loggedIn.getMonth() + '-' + loggedIn.getDayOfMonth();
                                             
                                             def diff= s.timestamp.getValue().toInstant().toEpochMilli() - loggedIn.toInstant().toEpochMilli();
                                             
                                             if(result.get(dt) == null)
                                             {
                                                 result.put(dt, diff / 1000 / 60 / 60 / 24 )
                                             }
                                          }
                                      }
                                          
                                      return result;
                               """
              }
            }
          }
        }
      }
    }
    

    Result

    "user" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "Jen",
              "doc_count" : 2,
              "distinct_sum_feedback" : {
                "value" : {
                  "2020-JANUARY-8" : 121
                }
              }
            },
            {
              "key" : "Mark",
              "doc_count" : 2,
              "distinct_sum_feedback" : {
                "value" : {
                  "2020-JANUARY-3" : 5
                }
              }
            }
          ]
        }
    

    Explanation

    1. "init_script":

    Executed prior to any collection of documents. Allows the aggregation to set up any initial state.

    Have declared a Map"

    1. "map_script"

    Executed once per document collected Loop through all document and add activity and timestamp to map

    1. combine_script

    Executed once on each shard after document collection is complete

    Return collection of Map for all shards

    1. reduce_script

    Executed once on the coordinating node after all shards have returned their results

    Once again go through through all Map and create a single collection and sort on timestamp. Then go through sorted Map and insert logged in and next "created post" time (diff of logged in and post created time)