Search code examples
elasticsearchbulkinsertelasticsearch-2.0bulkupdate

Catch Elasticsearch bulk errors when using bulkProcessor


I use bulkProcessor to insert/update bulks in ElasticSearch. I would like to catch

  • EsRejectedExecutionException
  • VersionConflictEngineException
  • DocumentAlreadyExistsException

but it doesn't throw anything. It only set a message on the response item. How can I handle it properly? e.g. applicative retry if rejected...

public BulkResponse response bulkUpdate(.....) {
    BulkResponse bulkWriteResult = null;
    long startTime = System.currentTimeMillis();
    AtomicInteger amountOfRequests = new AtomicInteger();
    long esTime;


    ElasticBulkProcessorListener listener = new    ElasticBulkProcessorListener(updateOperations);
    BulkProcessor bulkProcessor = BulkProcessor.builder(client, listener)
        .setBulkActions(MAX_BULK_ACTIONS)
        .setBulkSize(new ByteSizeValue(maxBulkSize, ByteSizeUnit.MB))
        .setConcurrentRequests(5)
        .build();


    updateOperations.forEach(updateRequest -> {
        bulkProcessor.add(updateRequest);
        amountOfRequests.getAndIncrement();
    });

try {
    boolean isFinished = bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS);
    if (isFinished) {
        if (listener.getBulkWriteResult() != null) {
            bulkWriteResult = listener.getBulkWriteResult();
        } else {
            throw new Exception("Bulk updating failed, results are empty");
        }
    } else {
        throw new Exception("Bulk updating failed, received timeout");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

return bulkWriteResult;
}


public class ElasticBulkProcessorListener implements BulkProcessor.Listener {
private long esTime = 0;
private List<Throwable> errors;
private BulkResponse response;

public long getEsTime() {
    return esTime;
}

@Override
public void beforeBulk(long executionId, BulkRequest request) {
    String description = "";
    if (!request.requests().isEmpty()) {
        ActionRequest request1 = request.requests().get(0);
        description = ((UpdateRequest) request1).type();
    }

    log.info("Bulk executionID: {}, estimated size is: {}MB, number of actions: {}, request type: {}",
            executionId, (request.estimatedSizeInBytes() / 1000000), request.numberOfActions(), description);
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
    log.info("Bulk executionID: {}, took : {} Millis, bulk size: {}", executionId, response.getTookInMillis(), response.getItems().length);
    esTime = response.getTookInMillis();
    response = createBulkUpdateResult(response);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
    log.error("Bulk , failed! error: ", executionId, failure);
    throw new DataFWCoreException(String.format("Bulk executionID: %d, update operation failed", executionId), failure);
}

}


Solution

  • The failure handler will be called only when network failure occurred, Any other case will get success handler.

    The only way to handle exception as I mention above is by parse each response item and figure out what happened.