Search code examples
elasticsearchpyspark

Aggregations in PySpark / Elasticsearch


I can query Elasticsearch, and aggregate the results, using:

spark.read.format("org.elasticsearch.spark.sql").load('my_index').selectExpr('max(my_field)').show()

However, that's inefficient, because PySpark retrieves all the data, and then performs the aggregation. I prefer elastic to do the aggregation, and hand spark only the result. How can it be done?

I know I can get elastic to do filtering for me, and hand over only the filtered results, with:

myquery = '''{ 
   "range": {
      "my_field": {
        "gt": 1,
        "boost": 1
      }
    }
  }'''
spark.read.format("org.elasticsearch.spark.sql").option("es.query", myquery).load('my_index').show()

Interestingly, when I let spark filter, like:

spark.read.format("org.elasticsearch.spark.sql").load('my_index').filter('my_field>1').show()

It seems to let elastic do the filtering. Anyway, I couldn't figure out how to do the aggregations in elastic.


Solution

  • Instead of relying on the Spark SQL interface to Elasticsearch, make a direct query to the Elasticsearch API. This ensures the heavy lifting is done by Elasticsearch. Once you've got the aggregated result, you can consume it in Spark for further analysis or processing. Suppose you want to get the max value of my_field. Use Elasticsearch's max aggregation:

    {
      "size": 0, 
      "aggs": {
        "max_value": {
          "max": {
            "field": "my_field"
          }
        }
      }
    }
    

    To execute this query, use a tool or library in Python (like requests) to send a POST request to the Elasticsearch endpoint (http://YOUR_ELASTICSEARCH_ENDPOINT/my_index/_search)

    Once you have the aggregated result from Elasticsearch, you can parse the relevant fields and create a DataFrame in Spark.

    import requests
    from pyspark.sql import SparkSession
    
    # Create Spark session
    spark = SparkSession.builder.appName("ElasticAgg").getOrCreate()
    
    # Elasticsearch endpoint and query
    es_endpoint = "http://YOUR_ELASTICSEARCH_ENDPOINT/my_index/_search"
    query = {
        "size": 0, 
        "aggs": {
            "max_value": {
                "max": {
                    "field": "my_field"
                }
            }
        }
    }
    
    response = requests.post(es_endpoint, json=query).json()
    max_val = response['aggregations']['max_value']['value']
    
    df = spark.createDataFrame([(max_val,)], ["max_my_field"])
    df.show()