Search code examples
c#multithreadingelasticsearch.net-corenest

Multithreading within a thread lock


I am working on speeding up the execution of some processes that publish a bulk set of records (Mostly in the millions) to Elasticsearch. In my C# code I have already implemented a multi-threaded solution using Dataflow as scaffolded below:

var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));

fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });

fetchRecords.Post("Start");

And then the send bulk request call I want to implement:

public IBulkResponse sendBulkRequest(List<?> records)
{
    lock(SomeStaticObject)
    {
       // Execute several new threads to send records in bulk
    }
}

My question for you is on the practicality for executing additional threads within a lock that exists as part of a Dataflow pipeline.

Is this ok? Could I see any potential hiccups in performance, execution, cache/memory misses, etc?

Any insight would be gladly accepted.


Solution

  • You may want to use BulkAll here, which implements the observable pattern to make concurrent bulk requests to Elasticsearch. Here's an example

    void Main()
    {   
        var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
        var connectionSettings = new ConnectionSettings(pool);
    
        var client = new ElasticClient(connectionSettings);
        var indexName = "bulk-index";
    
        if (client.IndexExists(indexName).Exists)
            client.DeleteIndex(indexName);
    
        client.CreateIndex(indexName, c => c
            .Settings(s => s
                .NumberOfShards(3)
                .NumberOfReplicas(0)
            )
            .Mappings(m => m
                .Map<DeviceStatus>(p => p.AutoMap())
            )
        );
    
        var size = 500;
    
        // set up the observable
        var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
            .Index(indexName)
            .MaxDegreeOfParallelism(4)
            .RefreshOnCompleted()
            .Size(size)
        );
    
        var countdownEvent = new CountdownEvent(1);
    
        Exception exception = null;
    
        // set up an observer. Delegates passed are:
        // 1. onNext
        // 2. onError
        // 3. onCompleted
        var bulkAllObserver = new BulkAllObserver(
            response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
            ex => 
            {
                // capture exception for throwing outside Observer.
                // You may decide to do something different here
                exception = ex;
                countdownEvent.Signal();
            },
            () => 
            {
                Console.WriteLine("Finished");
                countdownEvent.Signal();
            });
    
        // subscribe to the observable          
        bulkAllObservable.Subscribe(bulkAllObserver);
    
        // wait indefinitely for it to finish. May want to put a
        // max timeout on this  
        countdownEvent.Wait();
    
        if (exception != null) 
        {
            throw exception;
        }
    }
    
    // lazily enumerated collection
    private static IEnumerable<DeviceStatus> GetDeviceStatus()
    {
        for (var i = 0; i < DocumentCount; i++)
            yield return new DeviceStatus(i); 
    }
    
    private const int DocumentCount = 20000;
    
    public class DeviceStatus
    {
        public DeviceStatus(int id) => Id = id;
        public int Id {get;set;}
    }
    

    If you don't need to do anything special in the observer, you can use the .Wait() method on the observable

    var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
        .Index(indexName)
        .MaxDegreeOfParallelism(4)
        .RefreshOnCompleted()
        .Size(size)
    )
    .Wait(
        TimeSpan.FromHours(1), 
        response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
    );
    

    There are observable methods for BulkAll, ScrollAll and Reindex (although there is ReindexOnServer which reindexes within Elasticsearch and maps to the Reindex API - the Reindex method predates this)