Search code examples
c#multithreadingconcurrencytasktask-parallel-library

Use multiple tasks to retrieve all records from a large collection


I am working on an application which calls an external service and has to add all entries of the external collection into a local collection. The problem currently is that the external collection can exceed 1000 records, but the returned search results can only include up to twenty items.

For the sake of speed I figured using a collection of Tasks would be the way forward, so I came up with the code below:

int totalCount = returnedCol.total_count;
while (totalCount > myDict.Count)
{
    int numberOfTasks = // logic to calculate how many tasks to run

    List<Task> taskList = new List<Task>();

    for (int i = 1; i <= numberOfTasks; i++)
    {
        Interlocked.Add(ref pageNumber, pageSize);

        Task<SearchResponse> testTask = Task.Run(() =>
        {
            return ExternalCall.GetData(pageNumber, pageSize);
        });

        Thread.Sleep(100);

        taskList.Add(testTask);
        testTask.ContinueWith(o =>
        {
            foreach (ExternalDataRecord dataiwant in testTask.Result.dataiwant)
            {
                if (!myDict.ContainsKey(dataiwant.id))
                    myDict.GetOrAdd(dataiwant.id, dataiwant);
            }
        });
    }
    Task.WaitAll(taskList.ToArray());
}

However, this does not yield all results. The pageNumber variable is incrementing correctly each time, but it seems that not all task results are being analysed (as the same logic on a single thread on a smaller data set returns all expected results). Also, I have tried declaring individual tasks in a chain (rather than a loop) and the test data is all returned. It seems that the higher the value I pass into Thread.Sleep() the more the results are added into the local collection (but this isn't ideal, as it means the process takes longer!)

Currently in a sample of 600 records I'm only getting about 150-200 added to the myDict collection. Am I missing something obvious?


Solution

  • You're missing the fact that ContinueWith() results in another task and you aren't adding that your your taskList.

    A better approach would be to use the async/await available since .NET 4.5. It provides a less heavy approach to the solution.

    You would change the algorithm to be more like this:

    public async Task Process()
    {
        int totalCount = returnedCol.total_count;
    
        while (totalCount > myDict.Count)
        {
            int numberOfTasks = // logic to calculate how many tasks to run
    
            List<Task> taskList = new List<Task>();
    
            for (int i = 1; i <= numberOfTasks; i++)
            {
                Interlocked.Add(ref pageNumber, pageSize);
    
                taskList.Add(ProcessPage(pageNumber, pageSize));
            }
    
            await Task.WhenAll(taskList.ToArray());
        }
     }
    
     private async Task ProcessPage(int pageNumber, int pageSize)
     {
           SearchResponse result = await Task.Run(() => 
               ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false);
    
           foreach (ExternalDataRecord dataiwant in result.dataiwant)
           {
               myDict.GetOrAdd(dataiwant.id, dataiwant);
           }
     }
    

    The async keyword tells the compiler that there will be an await later on. await essentially handles the details around your ContinueWith call. If you really want the ExternalCall to happen in another task, then you would simply await the results from that call.