Search code examples
elasticsearchnest

ElasticSearch - calling UpdateByQuery and Update in parallel causes 409 conflicts


Working with large index (100,000 documents), I have a use case that spawns several threads who are trying to update documents in parallel, the source code uses two methods in updating the documents: Update and UpdateByQuery, so some of the threads calls Update and some of them calls UpdateByQuery. For the sake of brevity, each of the threads tries to update the same property for the entire documents.

Here is a small POC that demonstrate the use case:

Indexing 100,000 documents of Product type, and spawn 100 tasks so each task calls Update and UpdateByQuery in parallel. both of them uses MatchAll query.

public async Task ConflictsTestAsync()
{
    // index 100,000 documents.
    IEnumerable<Product> products = CreateProducts(100000);
    await _client.IndexManyAsync(products);

    await _client.Indices.UpdateSettingsAsync("MyIndex", s => s
        .IndexSettings(i => i.Setting(UpdatableIndexSettings.MaxResultWindow, 100000)));

    var searchResponse = _client.Search<Product>(s => s
        .From(0)
        .Size(100000)
        .Query(q => q.MatchAll())
    );
    IReadOnlyCollection<IHit<Product>> getProducts = searchResponse.Hits;

    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 100; i++)
    {
        tasks.Add(UpdateByQuery(getProducts));
        tasks.Add(Update(getProducts));
    }

    await Task.WhenAll(tasks);
}

public class Product
{
    public string Price { get; set; }
    public string Id { get; set; }
}

UpdateByQuery (update the price to be 0):

private async Task UpdateByQuery()
{
    Func<UpdateByQueryDescriptor<Product>, IUpdateByQueryRequest> updateByQuerySelector = (UpdateByQueryDescriptor<Product> updateByQueryDescriptor) =>
    {
        updateByQueryDescriptor
            .Conflicts(Conflicts.Abort)
            .ErrorTrace()
            .Query(x => x.MatchAll())
            .Script(x => x.Source("ctx._source['price'] = '0'"));               

        IUpdateByQueryRequest result = updateByQueryDescriptor;
        return result;
    };

    await _client.UpdateByQueryAsync(updateByQuerySelector, CancellationToken.None);
}

Update (update the price to be 1):

private async Task Update(IReadOnlyCollection<IHit<Product>> keys)
{
    foreach (IHit<Product> product in keys)
    {
        DocumentPath<Product> id = new DocumentPath<Product>(product.Id);
        Func<UpdateDescriptor<Product, Product>, IUpdateRequest<Product, Product>> updateSelector = (UpdateDescriptor<Product, Product> updateDescriptor) =>
        {
            var page = product.Source;
            page.Price = "1";

            updateDescriptor.Doc(page);

            IUpdateRequest<Product, Product> result = updateDescriptor;

            return result;
        };

        await _client.UpdateAsync<Product>(id, updateSelector);
    }
}

The problem is spawning multiple threads of Query and UpdateByQuery causes conflicts exceptions:

Some of the Update throws:

Invalid NEST response built from a unsuccessful (409) low level call on POST: /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c Audit trail of this API call:

  • [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.2495465 OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 409 from: POST /MyIndex/_update/d75a34ae-2533-4e15-a852-13e98c5b599c. ServerError: Type: version_conflict_engine_exception Reason: "[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]" Request: {"doc":{"id":"d75a34ae-2533-4e15-a852-13e98c5b599c","manufacturer":"777e1602-8390-40c8-817e-fdef4e3fb9c0","price":"1","title":"31184e90-f1d1-45be-8746-496a50de2f97","description":"780cc1ab-0a8b-4114-a840-67a528de8e55"}} Response: {"error":{"root_cause":[{"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"}],"type":"version_conflict_engine_exception","reason":"[d75a34ae-2533-4e15-a852-13e98c5b599c]: version conflict, required seqNo [701069], primary term [1]. current document has seqNo [702042] and primary term [1]","index_uuid":"gFAfYHfNQZG0tD0L2a9yag","shard":"0","index":"MyIndex"},"status":409}

And some of the UpdateByQuery throws (I've cleaned the failures array):

Invalid NEST response built from a unsuccessful (409) low level call on POST: /MyIndex/_update_by_query?conflicts=abort&error_trace=true Audit trail of this API call:

  • [1] BadResponse: Node: http://localhost:9200/ Took: 00:00:00.2636546 OriginalException: Elasticsearch.Net.ElasticsearchClientException: Request failed to execute. Call: Status code 409 from: POST /MyIndex/_update_by_query?conflicts=abort&error_trace=true Request: {"query":{"match_all":{}},"script":{"source":"ctx._source['price'] = '0'"}} Response: {"took":120,"timed_out":false,"total":100000,"updated":0,"deleted":0,"batches":1,"version_conflicts":1000,"noops":0,"retries":{"bulk":0,"search":0},"throttled_millis":0,"requests_per_second":-1.0,"throttled_until_millis":0,"failures":[{"index":"MyIndex","type":"_doc","id":"d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b","cause":{"type":"version_conflict_engine_exception","reason":"[d5fb4183-4ff4-43c9-962c-ee9d0ee59a6b]: version conflict, required seqNo [0], primary term [1]. current document has seqNo [100000] and primary term [1]","index_uuid":"pPDFKhj6T4y-MpzYECKpxQ","shard":"0"]}

Because performance are important I prefer not to give up on UpdateByQuery (and uses only Update) or mutexing the methods access, any suggestion to deal with this situation will be appreciated.

Update: Data integrity is important: .Conflicts(Conflicts.Abort)

ElasticSearch: 7.10.0.


Solution

  • TL;DR: You can pass conflicts=proceed to the update_by_query API if you want it to continue working even when hitting conflicts.

    More details: The update_by_query page explains:

    When you submit an update by query request, Elasticsearch gets a snapshot of the data stream or index when it begins processing the request and updates matching documents using internal versioning. When the versions match, the document is updated and the version number is incremented. If a document changes between the time that the snapshot is taken and the update operation is processed, it results in a version conflict and the operation fails. You can opt to count version conflicts instead of halting and returning by setting conflicts to proceed.

    So basically, your update and update_by_query are trying to update the same documents, conflicting with each other. Using conflicts=proceed makes that operation say "oh well, I'll just continue to update the other docs".