Search code examples
apache-flinkapache-stormflink-streamingtrident

Reading data form Elasticsearch into Flink aggregation?


I'm trying to update documents in Elasticsearch using Kafka messages (as a StreamSource). Writing to Elasticsearch in bulks using windows and the Elasticsearch connector as a sink is fine, however, we need to update existing data in the documents and read that in a bulk-performant manner (not for every tuple but for e.g. the whole window after a byKey() split that we want to aggregate over)

We are using Storm Trident right now which is performing bulk reads before a persistentAggregate and writes the updated aggregations back after, minimizing interaction with the backend. I just can't find something similar in Flink - any hints?


Solution

  • How about running two window call on stream -

    window1 - To bulk read from elasticsearch

    window2 - To bulk into elasticsearch.

    streamData
      .window1(bulkRead and update/join)
      .processFunction(...)
      .window2(BulkPush)
    
    • You can use any suitable method for bulk-read like Storm Trident.
    • use BulkProcessor in window2 link