Search code examples
apache-sparkelasticsearchpysparkspark-structured-streaming

Fetch the most recent N records from Elasticsearch using Spark


I want to retrieve the last 50 records inserted into Elasticsearch to find out their average for an Anomaly detection project. This is how I am retrieving data from ES. However, it is fetching the entire data, not the last 50 records. Is there any way to do that?

edf = spark \
   .read \
   .format("org.elasticsearch.spark.sql") \
   .option("es.read.metadata", "false") \
   .option("es.nodes.wan.only","true") \
   .option("es.port","9200")\
   .option("es.net.ssl","false")\
   .option("es.nodes", "http://localhost") \
   .load("anomaly_detection/data")

# GroupBy based on the `sender` column
df3 = edf.groupBy("sender") \
     .agg(expr("avg(amount)").alias("avg_amount"))

Here the sender column is fetching the entire row data, how to get only the last 50 DataFrame rows data?

Input data schema format:

|sender|receiver|amount|

Solution

  • You can also add the query while reading the data as

    query='{"query": {"match_all": {}}, "size": 50, "sort": [{"_timestamp": {"order": "desc"}}]}'
    

    and pass it as

    edf = spark \
       .read \
       .format("org.elasticsearch.spark.sql") \
       .option("es.read.metadata", "false") \
       .option("es.nodes.wan.only","true") \
       .option("es.port","9200")\
       .option("es.net.ssl","false")\
       .option("es.nodes", "http://localhost") \
       .option("query", query)
       .load("anomaly_detection/data")