I am working on speeding up the execution of some processes that publish a bulk set of records (Mostly in the millions) to Elasticsearch. In my C# code I have already implemented a multi-threaded solution using Dataflow as scaffolded below:
var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));
fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });
fetchRecords.Post("Start");
And then the send bulk request call I want to implement:
public IBulkResponse sendBulkRequest(List<?> records)
{
lock(SomeStaticObject)
{
// Execute several new threads to send records in bulk
}
}
My question for you is on the practicality for executing additional threads within a lock that exists as part of a Dataflow pipeline.
Is this ok? Could I see any potential hiccups in performance, execution, cache/memory misses, etc?
Any insight would be gladly accepted.
You may want to use BulkAll
here, which implements the observable pattern to make concurrent bulk requests to Elasticsearch. Here's an example
void Main()
{
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var connectionSettings = new ConnectionSettings(pool);
var client = new ElasticClient(connectionSettings);
var indexName = "bulk-index";
if (client.IndexExists(indexName).Exists)
client.DeleteIndex(indexName);
client.CreateIndex(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
)
.Mappings(m => m
.Map<DeviceStatus>(p => p.AutoMap())
)
);
var size = 500;
// set up the observable
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
);
var countdownEvent = new CountdownEvent(1);
Exception exception = null;
// set up an observer. Delegates passed are:
// 1. onNext
// 2. onError
// 3. onCompleted
var bulkAllObserver = new BulkAllObserver(
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
ex =>
{
// capture exception for throwing outside Observer.
// You may decide to do something different here
exception = ex;
countdownEvent.Signal();
},
() =>
{
Console.WriteLine("Finished");
countdownEvent.Signal();
});
// subscribe to the observable
bulkAllObservable.Subscribe(bulkAllObserver);
// wait indefinitely for it to finish. May want to put a
// max timeout on this
countdownEvent.Wait();
if (exception != null)
{
throw exception;
}
}
// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
for (var i = 0; i < DocumentCount; i++)
yield return new DeviceStatus(i);
}
private const int DocumentCount = 20000;
public class DeviceStatus
{
public DeviceStatus(int id) => Id = id;
public int Id {get;set;}
}
If you don't need to do anything special in the observer, you can use the .Wait()
method on the observable
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
)
.Wait(
TimeSpan.FromHours(1),
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);
There are observable methods for BulkAll
, ScrollAll
and Reindex
(although there is ReindexOnServer
which reindexes within Elasticsearch and maps to the Reindex API - the Reindex
method predates this)