Search code examples
elasticsearchkibanaamazon-opensearch

In Elasticsearch, how can I aggregate data from nested fields and their parent document?


Give a sales index with this mapping:

{
  "mappings": {
    "properties": {
      "amount": {
        "type": "float"
      },
      "created_at": {
        "type": "date",
        "format": "date_time||epoch_millis"
      },
      "events": {
        "type": "nested",
        "properties": {
          "created_at": {
            "type": "date",
            "format": "date_time||epoch_millis"
          },
          "fees": {
            "properties": {
              "amount": {
                "type": "float"
              },
              "credit_debit": {
                "type": "keyword"
              }
            }
          }
        }
      },
      "id": {
        "type": "keyword"
      },
      "status": {
        "type": "keyword"
      },
      "type": {
        "type": "keyword"
      }
    }
  }
}

My question is, how can I query for the following?

  • for each sales.id
  • …that were created_at in a specific range
  • show:
    • the sales.id
    • the sales.amount
    • max (i.e. latest) sales.events.created_at
    • total sales.events.fees.amount

My end goal is to have a CSV file with the results. Any solution would work, including:

  • create a new index and reindex with some calculations
  • an advanced aggregation query
  • a Kibana SQL query
  • a Kibana visualisation
  • something else

Solution

  • You can use the following query in order to extract the information you need:

    GET /sales/_search?filter_path=**.key,**.amount,**.created_at,**.total_fees.value,**.latest.value
    {
      "size": 0,
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "created_at": {
                  "gte": "2023-04-01T00:00:00.000+02:00",
                  "lte": "2023-07-01T00:00:00.000+02:00"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "pages": {
          "composite": {
            "size": 1000,
            "sources": [
              {
                "id": {
                  "terms": {
                    "field": "id"
                  }
                }
              }
            ]
          },
          "aggs": {
            "fields": {
              "top_hits": {
                "size": 1,
                "_source": [
                  "amount",
                  "created_at"
                ]
              }
            },
            "events": {
              "nested": {
                "path": "events"
              },
              "aggs": {
                "latest": {
                  "max": {
                    "field": "events.created_at"
                  }
                },
                "total_fees": {
                  "sum": {
                    "field": "events.fees.amount"
                  }
                }
              }
            }
          }
        }
      }
    }
    

    If you need to paginate to the next page because there are more than 1000 buckets, you can do so by using the same query and adding the after parameter and specifying the id of the very last bucket of the preceding page:

    GET /sales/_search?filter_path=**.key,**.amount,**.created_at,**.total_fees.value,**.latest.value
    {
      "size": 0,
      "query": {
        "bool": {
          "filter": [
            {
              "range": {
                "created_at": {
                  "gte": "2023-04-01T00:00:00.000+02:00",
                  "lte": "2023-07-01T00:00:00.000+02:00"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "pages": {
          "composite": {
            "size": 1000,
            "sources": [
              {
                "id": {
                  "terms": {
                    "field": "id"
                  }
                }
              }
            ],
            "after": {"id": "xyz"}
          },
          "aggs": {
            "fields": {
              "top_hits": {
                "size": 1,
                "_source": [
                  "amount",
                  "created_at"
                ]
              }
            },
            "events": {
              "nested": {
                "path": "events"
              },
              "aggs": {
                "latest": {
                  "max": {
                    "field": "events.created_at"
                  }
                },
                "total_fees": {
                  "sum": {
                    "field": "events.fees.amount"
                  }
                }
              }
            }
          }
        }
      }
    }
    

    Then you can export the results to CSV using the following jq command:

    jq -r '.aggregations.pages.buckets[] | [.key.id, .fields.hits.hits[]."_source".amount, .fields.hits.hits[]."_source".created_at, .events.total_fees.value, .events.latest.value] | @csv' input.json 
    

    You'll get something like this:

    "056c65ec-22f6-4da1-9bce-82c12ed845cd","5.90",1681211194150,0.3499999940395355,1681289446844