Search code examples
elasticsearchapache-flinkflink-streaming

What is the priority of bulk flush actions in Flink's Elasticsearch sink?


I am following this to create an Elasticsearch sink in a Flink application. I want to understand what will be priority if I provide all the values like

int FLUSH_MAX_ACTIONS = 10_000;
long FLUSH_INTERVAL_MILLIS = 1_000;
int FLUSH_MAX_SIZE_MB = 1;

esSinkBuilder.setBulkFlushMaxActions(FLUSH_MAX_ACTIONS);
esSinkBuilder.setBulkFlushInterval(FLUSH_INTERVAL_MILLIS);
esSinkBuilder.setBulkFlushMaxSizeMb(FLUSH_MAX_SIZE_MB);
esSinkBuilder.setBulkFlushBackoff(true);

In this case, I specified to flush

  1. If time interval has crossed 1second
  2. If size of buffer is 1MB
  3. If there are 10,000 records waiting in buffer

As I am specifying all three at once, what will be the result, which one takes precedence ?


Solution

  • Whichever limit is reached first triggers the flush. This is handled by org.elasticsearch.action.bulk.BulkProcessor.