Search code examples
.netmultithreadingasynchronoustask-parallel-librarysqlcommand

Create a Task that groups several I/O Tasks


First of all, I am not sure if I am asking something stupid, so apologies before start.

I have a method that saves a collection of entities in the DB using a SqlCommand asynchronously, and it returns a Task that represents the async operation, and so far it works good. Basically it generates a SQL command and a bunch of parameters per entity, and add all of them to a SqlCommand instance (parameters are numbered)

But, if I try to insert a lot of entities, when the parameter count reaches 2100, the SQL operation fails, because that is the limit. Then I would like to split the entity collection in batches, and execute them sequentially, and then return a Task that won't be done till all the child Tasks are done. (probably the last one)

Each child task, will get a Int32 saying how many rows have changed, and the final Task has to return the sum of all of them. So all the task are Task. If one fails, all of them must be "rollbacked", so they must share connection and transaction objects.

Also, I would like to ensure that I am using correctly the I/O completion ports with the Task and SqlCommand, and that there are no threads waiting/spining while the operations on SQL Server are being done, since this "Save" operation is invoked from an asynchronous controller in ASP.NET MVC.

What would be the correct approach here?

Regards.


Solution

  • With C# 5 (and assuming Task-based asynchrony), this would be quite simple:

    async Task<IReadOnlyList<SomeType>> PerformUpdatesAsync(
        IEnumerable<AnotherType> data)
    {
        var result = new List<SomeType>();
    
        foreach (var batch in data.SplitIntoBatches(BatchSize))
            result.Add(await PerformUpdateAsync(batch));
    
        return result;
    }
    

    Simulating asynchronous foreach in pre-C# 5 is harder, but you can do it for example using a queue of batches to process and a “recursive” lambda:

    Task<SomeType[]> PerformUpdatesAsync(IEnumerable<AnotherType> data)
    {
        var batches = new Queue<IEnumerable<AnotherType>>(
            data.SplitIntoBatches(BatchSize));
    
        var result = new List<SomeType>();
    
        var tcs = new TaskCompletionSource<SomeType[]>();
    
        AsyncCallback onEndUpdate = null;
        onEndUpdate =
            ar =>
            {
                result.Add(EndUpdate(ar));
    
                if (batches.Count == 0)
                    tcs.SetResult(result.ToArray());
                else
                    BeginUpdate(batches.Dequeue(), onEndUpdate, null);
            };
    
        BeginUpdate(batches.Dequeue(), onEndUpdate, null);
    
        return tcs.Task;
    }
    

    If you don't like using closures this way, you could do the same thing by creating a separate class that will hold all the local variables in its fields.

    There is no SplitIntoBatches() in the framework, and so you will have to write it yourself (unless you already did).