Search code examples
c#elasticsearchnestelastic-stack

Elastic Search ingest attachment plugin blocks


I am using NEST (C#) and the ingest attachment plugin to ingest 10s of thousands of documents into an Elastic search instance. Unfortunately, after a while everything just stands still - i.e. no more documents are ingested. The log shows:

[2019-02-20T17:35:07,528][INFO ][o.e.m.j.JvmGcMonitorService] [BwAAiDl] [gc][7412] overhead, spent [326ms] collecting in the last [1s]

Not sure if this tells anyone anything? Btw, are there more efficient ways to ingest many documents (rather than using thousands of REST requests)?

I am using this kind of code:

client.Index(new Document
{
    Id = Guid.NewGuid(),
    Path = somePath,
    Content = Convert.ToBase64String(File.ReadAllBytes(somePath))
}, i => i.Pipeline("attachments"));

Define the pipeline:

client.PutPipeline("attachments", p => p
    .Description("Document attachment pipeline")
    .Processors(pr => pr
        .Attachment<Document>(a => a
        .Field(f => f.Content)
        .TargetField(f => f.Attachment)
        )
        .Remove<Document>(r => r
        .Field(f => f.Content)
        )
    )
);

Solution

  • The log indicates that a considerable amount of time is being spent performing Garbage Collection on the Elasticsearch server side; this is very likely to be the cause of large stop events that you are seeing. If you have monitoring enabled on the cluster (ideally exporting such data to a separate cluster), I would look at analysing those to see if it sheds some light on why large GC is happening.

    are there more efficient ways to ingest many documents (rather than using thousands of REST requests)?

    Yes, you are indexing each attachment in a separate index request. Depending on the size of each attachment, base64 encoded, you may want to send several in one bulk request

    // Your collection of documents
    var documents = new[]
    {
        new Document
        {
            Id = Guid.NewGuid(),
            Path = "path",
            Content = "content"
        },
        new Document
        {
            Id = Guid.NewGuid(),
            Path = "path",
            Content = "content" // base64 encoded bytes
        }
    };
    
    var client = new ElasticClient();
    
    var bulkResponse = client.Bulk(b => b
        .Pipeline("attachments")
        .IndexMany(documents)
    );
    

    If you're reading documents from the filesystem, you probably want to lazily enumerate them and send bulk requests. Here, you can make use of the BulkAll helper method too.

    First have some lazily enumerated collection of documents

    public static IEnumerable<Document> GetDocuments()
    {
        var count = 0;
        while (count++ < 20)
        {
            yield return new Document
            {
                Id = Guid.NewGuid(),
                Path = "path",
                Content = "content" // base64 encoded bytes
            };
        }
    }
    

    Then configure the BulkAll call

    var client = new ElasticClient();
    
    // set up the observable configuration
    var bulkAllObservable = client.BulkAll(GetDocuments(), ba => ba
        .Pipeline("attachments")
        .Size(10)
    );
    
    var waitHandle = new ManualResetEvent(false);
    
    Exception exception = null;
    
    // set up what to do in response to next bulk call, exception and completion
    var bulkAllObserver = new BulkAllObserver(
        onNext: response => 
        {
            // perform some action e.g. incrementing counter
            // to indicate how many have been indexed
        },
        onError: e =>
        {
            exception = e;
            waitHandle.Set();
        },
        onCompleted: () =>
        {
            waitHandle.Set();
        });
    
    // start the observable process
    bulkAllObservable.Subscribe(bulkAllObserver);
    
    // wait for indexing to finish, either forever,
    // or set a max timeout as here.
    waitHandle.WaitOne(TimeSpan.FromHours(1));
    
    if (exception != null)
        throw exception;
    

    Size dictates how many documents to send in each request. There are no hard and fast rules for how big this can be for your cluster, because it can depend on a number of factors including ingest pipeline, the mapping of documents, the byte size of documents, the cluster hardware etc. You can configure the observable to retry documents that fail to be indexed, and if you see es_rejected_execution_exception, you are at the limits of what your cluster can concurrently handle.

    Another recommendation is that of document ids. I see you're using new Guids for the ids of documents, which implies to me that you don't care what the value is for each document. If that is the case, I would recommend not sending an Id value, and instead allow Elasticsearch to generate an id for each document. This is very likely to result in an improvement in performance (I believe the implementation had changed slightly further in Elasticsearch and Lucene since this post, but the point still stands).