Search code examples
elasticsearchelasticsearch-aggregation

ElasticSearch: How to make an aggregation pipeline?


Imagine the following use case:

We work at Stark Airlines and our marketing team wants to segment our passengers in order to give them discounts or gift cards. They decide that they want two sets of passengers:

  1. Passengers that fly at least 3 times per week
  2. Passenger who have flown at least once but who have not flown for two weeks

With this they can make different marketing campaigns for our passengers!

So, in elastic search we have a trip index that represents a ticket bought by a passenger:

{
   "_index" : "trip",
   "_type" : "_doc",
   "_id" : "1",
   "_score" : 1.0,
   "_source" : {
       "total_amount" : 300,
       "trip_date" : "2020/03/24 13:30:00",
       "status" : "completed",
       "passenger" : {
          "id" : 11,
          "name" : "Thiago nunes"
       }
     }
}

The trip index contains a status field that may have other values like: pending or open or canceled

This means that we can only take into account trips that has the completed status (Meaning the passenger did travel).

So, with all this in mind...How would I get those two sets of passengers with elastic search?

I have been trying for a while but with no success.

What I have done until now:

  1. I have built a query that gets all valid trip (trips with status completed)
GET /trip/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "status": {
              "value": "completed"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "status_viagem": {
      "terms": {
        "field": "status.keyword"
      }
    }
  }
}
  1. This query returns the following:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 200,
      "relation" : "eq"
    },
    "max_score" : 0.18232156,
    "hits" : [...]
  },
  "aggregations" : {
    "status_viagem" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "completed",
          "doc_count" : 200
        }
      ]
    }
  }
}

But I am stuck and can't figure out the next step. I know that the next thing to do should create buckets with passengers and then filter them in two buckets representing our desired data sets. But I don't know how.

Can someone help?

PS.:

  1. I don't exactly need this to be one single query, just a hint about how to build a query like this would be very helpful

  2. THE OUPUT SHOULD BE AN ARRAY of passenger id's

  3. Note: I have shortened the trip index for the sake of simplicity


Solution

  • As per my understanding of your issue.

    I have used date_histogram with interval as week to get collection on passengers which week. Only those passengers are kept which have three documents in a week. This will give you all passengers which have traveled thrice in a week.

    In another aggregation I have use terms aggregation to get passengers and their last travel date. Using bucket selector have kept passengers whose last travel is not beyond certain date.

    Mapping

    {
      "index87" : {
        "mappings" : {
          "properties" : {
            "passengerid" : {
              "type" : "long"
            },
            "passengername" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "status" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "total_amount" : {
              "type" : "long"
            },
            "trip_date" : {
              "type" : "date"
            }
          }
        }
      }
    }
    
    

    Query

    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "status": {
                  "value": "completed"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "travel_thrice_week": {
          "date_histogram": {
            "field": "trip_date",
            "interval": "week"
          },
          "aggs": {
            "passenger": {
              "terms": {
                "field": "passengername.keyword",
                "min_doc_count": 3,
                "size": 10
              }
            },
            "select_bucket_with_user": {-->to keep weeks which have a pasenger with thrice 
                                        --> a day travel
              "bucket_selector": {
                "buckets_path": {
                  "passenger": "passenger._bucket_count"
                },
                "script": "if(params['passenger']>=1) {return true;} else{ return false;} "
              }
            }
          }
        },
        "not_flown_last_two_week": {
          "terms": {
            "field": "passengername.keyword",
            "size": 10
          },
          "aggs": {
            "last_travel": {
              "max": {
                "field": "trip_date" -->  most recent travel
              }
            },
            "last_travel_before_two_week": {
              "bucket_selector": {
                "buckets_path": {
                  "traveldate": "last_travel"
                },
                "script":{
                  "source": "if(params['traveldate']< params['date_epoch']) return true; else return false;",
                  "params": {
                    "date_epoch":1586408336000 --> unix epoc of cutt off date
                  }
                }
    
              }
            }
          }
        }
      }
    }
    

    Result:

    "aggregations" : {
        "not_flown_last_two_week" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "Thiago nunes",
              "doc_count" : 3,
              "last_travel" : {
                "value" : 1.5851808E12,
                "value_as_string" : "2020-03-26T00:00:00.000Z"
              }
            },
            {
              "key" : "john doe",
              "doc_count" : 1,
              "last_travel" : {
                "value" : 1.5799968E12,
                "value_as_string" : "2020-01-26T00:00:00.000Z"
              }
            }
          ]
        },
        "travel_thrice_week" : {
          "buckets" : [
            {
              "key_as_string" : "2020-03-23T00:00:00.000Z",
              "key" : 1584921600000,
              "doc_count" : 3,
              "passenger" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 0,
                "buckets" : [
                  {
                    "key" : "Thiago nunes",
                    "doc_count" : 3
                  }
                ]
              }
            }
          ]
        }
      }