Search code examples
c#sql-serverentity-framework-6dapper

Dapper with transactions throws "open DataReader associated with this command which must be closed first"


Very intermittently I am getting InvalidOperationException "open DataReader associated with this command which must be closed first"

This is a too long mission critical piece of code that was a stored procedure and probably still should be but a number of years ago I converted it to Dapper calls with a transaction to abandon it all if one part failed.

I have seen several several SO questions like here and here which are solved with a ToList() however, I'm using AsList() on a query result which only calls a logger call to Azure Table storage and a for loop on a parameter List so didn't think those applied here.

I'm also using the EF6 DBContext connection rather than the creating a standard SqlConnection which may just fix this but don't wish the change without understanding the issue first.

This method runs hundreds of times a day, every day without fail but when it throws this exception, a customer is locked out from re-running this for at least 15 minutes.

public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
    var conn = _context.Database.Connection;
    conn.Open();

    using (var transaction = _context.Database.BeginTransaction())
    {
        try
        {
            var checks = await conn.QueryMultipleAsync(@" 
    SELECT Id FROM Customer 
    WHERE Id = @customerId
    SELECT Id FROM Job WHERE Id = @jobId
            ", new { customerId, jobId }, transaction.UnderlyingTransaction);

            var customer = (await checks.ReadAsync<int?>()).SingleOrDefault();
            if (customer == null)
            {
                throw new ServiceNotFoundException($"Customer {customerId} not found");
            }

            var job = (await checks.ReadAsync<JobDto>()).SingleOrDefault();
            if (job == null)
            {
                throw new ServiceNotFoundException($"Job {jobId} not found");
            }

            var workItemId = (await conn.QueryAsync<int>(@"
    DECLARE @WorkItemId int
    INSERT WorkItem (CustomerId, JobId, JobDate)
    VALUES (@customerId, @job, GETDATE())
    SELECT @WorkItemId = SCOPE_IDENTITY()
    IF @CustomerId > 0
        UPDATE Customer SET LatestActionDate = GETDATE() WHERE Id = @CustomerId
    SELECT @WorkItemId
            ", new { customerId, jobId }, transaction.UnderlyingTransaction)).SingleOrDefault();

            await _auditLogger.LogJob(jobId, "Job created", user);

            for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
            {
                var taskId = (await conn.QueryAsync<int>(@"
    INSERT Task (JobId, Cost,) VALUES (@jobId, @Cost)
    SELECT Id FROM Task WHERE Id = CAST(SCOPE_IDENTITY() as int)
                ", new { jobId, job.Cost }, transaction.UnderlyingTransaction));
            }

            await conn.ExecuteAsync(@"
    //do another insert here
                        ", new { jobId }, transaction.UnderlyingTransaction);

            var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required 
    }).ToDataTable();

            var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
    --Complex cte insert that returns some details to log

            ", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT") 
    }, transaction.UnderlyingTransaction)).AsList();

            foreach(var task in allocatedTasks)
            {
                await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with `enter code here`{task.ItemCount} items");
            }

            transaction.Commit();

            return workItemId;
        }
        catch
        {
            transaction.Rollback();
            throw;
        }
    }
}

Solution

  • You are missing a using on checks, which is holding a reference to the reader. This must use braces in order to dispose at the correct point.

    Further issues:

    • You are using QueryAsync instead of QueryFirstAsync in the taskId bit.
    • You can use OUTPUT instead of a separate SELECT in the same section.
    • You don't need to catch and Rollback, the using will do that.
    • The connection also needs a using, and you should create a new one rather than using the one from EF Core.
    • Open the connection and create the transaction asynchronously also.
    • Use shorter syntax such as using statements without braces where possible, and ?? throw for the null checks.
    • Do not use AsList, it only checks for List<T> it does not convert and does not dispose.
    public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
    {
        await using var conn = CreateNewConnection();
        await conn.OpenAsync();
    
        await using var transaction = conn.BeginTransactionAsync();
    
        int customer, JobDto job;
        await (using var checks = await conn.QueryMultipleAsync(@" 
        SELECT Id FROM Customer 
        WHERE Id = @customerId;
        SELECT Id FROM Job WHERE Id = @jobId;
                ", new { customerId, jobId }, transaction))
        {
            customer = (await checks.ReadAsync<int?>()).SingleOrDefault()
                ?? throw new ServiceNotFoundException($"Customer {customerId} not found");
    
            job = (await checks.ReadAsync<JobDto>()).SingleOrDefault()
                ?? throw new ServiceNotFoundException($"Job {jobId} not found");
        }
    
        var workItemId = (await conn.QueryAsync<int>(@"
        INSERT WorkItem (CustomerId, JobId, JobDate)
        OUTPUT inserted.Id
        VALUES (@customerId, @job, GETDATE());
    
        IF @CustomerId > 0
            UPDATE Customer
            SET LatestActionDate = GETDATE()
            WHERE Id = @CustomerId;
                ", new { customerId, jobId }, transaction)).SingleOrDefault();
    
        await _auditLogger.LogJob(jobId, "Job created", user);
    
        for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
        {
            var taskId = (await conn.QueryAsync<int>(@"
        INSERT Task (JobId, Cost)
        OUTPUT inserted.Id
        VALUES (@jobId, @Cost);
            ", new { jobId, job.Cost }, transaction));
    
            // do something with taskId
        }
    
        await conn.ExecuteAsync(@"
        //do another insert here
                            ", new { jobId }, transaction.UnderlyingTransaction);
    
        var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required 
        }).ToDataTable();
    
        var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
        --Complex cte insert that returns some details to log
    
                ", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT") 
        }, transaction.UnderlyingTransaction)).ToListAsync();
    
        foreach(var task in allocatedTasks)
        {
            await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with {task.ItemCount} items");
        }
    
        await transaction.CommitAsync();
    
        return workItemId;
    }
    

    You can also replace the entire taskId loop with a single insert, using GENERATE_SERIES or on older versions you can use a numbers function.

    var taskIdList = (await conn.QueryAsync<int>(@"
        INSERT Task (JobId, Cost)
        OUTPUT inserted.Id
        SELECT @jobId, @Cost
        FROM GENERATE_SERIES(1, @Count);
            ", new { jobId, job.Cost, Count = tasks.Sum(o => o.ItemCount * o.Required) }, transaction)).ToListAsync();
    
    // do something with taskId
    

    It's hard to tell exactly given your stripped-down code, but you could probably do this entire set of commands in a single SQL batch.