Search code examples
c#async-awaitentity-framework-coreparallel.foreach

Partition input and execute queries in parallel


I have the following code where I want to retrieve employee information for a given list of employee ids. I have validation in place to throw exception if the count is more than 1 million. In most cases the request will be less than 200K so I am splitting the request into 4 partitions, each partition containing equal number of employee ids. All 4 partitions are executed in parallel and joined together after Task.WhenAll. Can some one please give me some hints on improving this further ? I looked at ParallelForEachAsync and Parallel Foreach async in C# but unable to make it to work. The code mentioned below works but its hard coded to split into 4 partitions. How do I make it more parallel with dynamic partitions with max degree of parallelism set to say 50? If the input is 100K ids, I want to split in 10 partitions and execute all 10 in parallel.

public class Service
{
    private async Task<List<EmployeeEntity>> GetInfo(List<long> input)
    {
        var breakup = input.Split(4);

        var result1Task = GetResult(breakup.First().ToList());
        var result2Task = GetResult(breakup.Skip(1).Take(1).First().ToList());
        var result3Task = GetResult(breakup.Skip(2).Take(1).First().ToList());
        var result4Task = GetResult(breakup.Skip(3).Take(1).First().ToList());

        await Task.WhenAll(result1Task, result2Task, result3Task, result4Task);

        List<EmployeeEntity> result1 = await result1Task;
        List<EmployeeEntity> result2 = await result2Task;
        List<EmployeeEntity> result3 = await result3Task;
        List<EmployeeEntity> result4 = await result4Task;

        return result1.Union(result2.Union(result3.Union(result4))).ToList();
    }

    private async Task<List<EmployeeEntity>> GetResult(List<long> employees)
    {
        using var context = new MyAppDBContext();
        var EmployeeBand = await context.EmployeeBand.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
        var EmployeeClient = await context.EmployeeClient.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
        return await context.Employee.Where(x => employees.Contains(x.EmployeeId)).ToListAsync();
    }
}

public static class ExtensionMethods
{
    public static List<List<T>> Split<T>(this List<T> myList, int parts)
    {
        int i = 0;
        var splits = from item in myList
                     group item by i++ % parts into part
                     select part.ToList();
        return splits.ToList();
    }
}

public class EmployeeEntity
{
    public EmployeeEntity()
    {
        EmployeeBands = new HashSet<EmployeeBandEntity>();
        EmployeeClients = new HashSet<EmployeeClientEntity>();
    }

    public long EmployeeId { get; set; }
    public ICollection<EmployeeBandEntity> EmployeeBands { get; set; }
    public ICollection<EmployeeClientEntity> EmployeeClients { get; set; }
}

public class EmployeeBandEntity
{
    public long EmployeeBandId { get; set; }
    public long EmployeeId { get; set; }
    public EmployeeEntity EmployeeEntity { get; set; }
}

public class EmployeeClientEntity
{
    public long EmployeeClientId { get; set; }
    public long EmployeeId { get; set; }
    public EmployeeEntity EmployeeEntity { get; set; }
}

public partial class MyAppDBContext : DbContext
{
    public virtual DbSet<EmployeeEntity> Employee { get; set; }
    public virtual DbSet<EmployeeBandEntity> EmployeeBand { get; set; }
    public virtual DbSet<EmployeeClientEntity> EmployeeClient { get; set; }
}
     

Solution

  • I believe you could be really creative about GetResult and rewrite in a better way so that queries are like where id greater than (and/or less than) rather than ids in (... list). Assuming your GetResult is implemented in best possible way already and you simply need a way to achieve maximum parallel execution here is my solution.

    private async Task<List<EmployeeEntity>> GetInfo2(List<long> input)
    {
        if (input == null)
        {
            return null;
        }
    
        if (input.Count == 0)
        {
            return new List<EmployeeEntity>();
        }
    
        var taskList = new List<Task<List<EmployeeEntity>>>();
    
        foreach (var batch in input.Batch(100))
        {
            taskList.Add(GetResult(batch.ToList()));
        }
    
        var result = (await Task.WhenAll(taskList)).SelectMany(a => a);
    
        return result.ToList();
    }
    

    This requires following batch extension method.

    public static class Extensions
        {
            public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
            {
                T[] bucket = null;
                var count = 0;
    
                foreach (var item in source)
                {
                    if (bucket == null)
                        bucket = new T[size];
    
                    bucket[count++] = item;
    
                    if (count != size)
                        continue;
    
                    yield return bucket.Select(x => x);
    
                    bucket = null;
                    count = 0;
                }
    
                if (bucket != null && count > 0)
                    yield return bucket.Take(count);
            }
        }
    

    You could find a sweet spot for size of batch. I've hardcoded it to 100 but you could derive it based on the size of input list or whatever other logic you may have.