Background
I have somewhat simplified this scenario but this is the general problem.
I am using an Azure Data Factory to ingest data from a custom API into a table in Azure Data Warehouse. I am using an IDotNetActivity to run the C# code that calls the API and loads the data into the data warehouse. The activity runs in Azure Batch.
Within the activity itself, before I call the custom API, I load a list of people from a file in Azure Blob storage. I then make a call to the custom API for each person in the file. These calls are made sequentially one after another. The problem is that this approach takes too long. The file size is likely to grow so the time it takes will only get worse.
Things I've tried to improve performance
The Main Question
Does Azure Batch support async / await?
Further questions
Can anyone shed some light as to why MoreLinq's Batch doesn't work in Azure Batch? Here is a snippet of the affected code:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
Per my understanding, personIds.Batch(100)
just batches personIds
into sized (100) buckets.
//method1
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
//method2
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));
Both the above methods would make a call to your custom API for each person sequentially, while method1
has added the additional logical for handling the same task.
Does Azure Batch support async / await?
Base on your code, I defined the IDotNetActivity
implemention as follows, you could refer to it:
public class MyDotNetActivity : IDotNetActivity
{
public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
}
async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
var tasks = new List<Task<List<CustomApiResult>>>();
foreach (var personIdsBatch in personIds.Batch(100))
{
tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
}
var taskResults = await Task.WhenAll(tasks);
List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();
//process the custom api results
return new Dictionary<string, string>();
}
async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
{
//Get Custom Results By Batch
return new List<CustomApiResult>();
}
async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
{
//load a list of people from a file in Azure Blob storage
return new List<int>();
}
}
Also, I assumed that you could leverage Parallel.ForEach as follows to execute your synchronistical jobs in parallel:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
Parallel.ForEach(personIds.Batch(100),
new ParallelOptions()
{
MaxDegreeOfParallelism=5
},
(personIdsBatch) =>
{
var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
lock (customResults)
{
customResults.AddRange(results);
}
});