Very intermittently I am getting InvalidOperationException
"open DataReader associated with this command which must be closed first"
This is a too long mission critical piece of code that was a stored procedure and probably still should be but a number of years ago I converted it to Dapper calls with a transaction to abandon it all if one part failed.
I have seen several several SO questions like here and here which are solved with a ToList()
however, I'm using AsList()
on a query result which only calls a logger call to Azure Table storage and a for loop on a parameter List so didn't think those applied here.
I'm also using the EF6 DBContext connection rather than the creating a standard SqlConnection which may just fix this but don't wish the change without understanding the issue first.
This method runs hundreds of times a day, every day without fail but when it throws this exception, a customer is locked out from re-running this for at least 15 minutes.
public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
var conn = _context.Database.Connection;
conn.Open();
using (var transaction = _context.Database.BeginTransaction())
{
try
{
var checks = await conn.QueryMultipleAsync(@"
SELECT Id FROM Customer
WHERE Id = @customerId
SELECT Id FROM Job WHERE Id = @jobId
", new { customerId, jobId }, transaction.UnderlyingTransaction);
var customer = (await checks.ReadAsync<int?>()).SingleOrDefault();
if (customer == null)
{
throw new ServiceNotFoundException($"Customer {customerId} not found");
}
var job = (await checks.ReadAsync<JobDto>()).SingleOrDefault();
if (job == null)
{
throw new ServiceNotFoundException($"Job {jobId} not found");
}
var workItemId = (await conn.QueryAsync<int>(@"
DECLARE @WorkItemId int
INSERT WorkItem (CustomerId, JobId, JobDate)
VALUES (@customerId, @job, GETDATE())
SELECT @WorkItemId = SCOPE_IDENTITY()
IF @CustomerId > 0
UPDATE Customer SET LatestActionDate = GETDATE() WHERE Id = @CustomerId
SELECT @WorkItemId
", new { customerId, jobId }, transaction.UnderlyingTransaction)).SingleOrDefault();
await _auditLogger.LogJob(jobId, "Job created", user);
for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
{
var taskId = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost,) VALUES (@jobId, @Cost)
SELECT Id FROM Task WHERE Id = CAST(SCOPE_IDENTITY() as int)
", new { jobId, job.Cost }, transaction.UnderlyingTransaction));
}
await conn.ExecuteAsync(@"
//do another insert here
", new { jobId }, transaction.UnderlyingTransaction);
var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required
}).ToDataTable();
var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
--Complex cte insert that returns some details to log
", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT")
}, transaction.UnderlyingTransaction)).AsList();
foreach(var task in allocatedTasks)
{
await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with `enter code here`{task.ItemCount} items");
}
transaction.Commit();
return workItemId;
}
catch
{
transaction.Rollback();
throw;
}
}
}
You are missing a using
on checks
, which is holding a reference to the reader. This must use braces in order to dispose at the correct point.
Further issues:
QueryAsync
instead of QueryFirstAsync
in the taskId
bit.OUTPUT
instead of a separate SELECT
in the same section.catch
and Rollback
, the using
will do that.using
, and you should create a new one rather than using the one from EF Core.using
statements without braces where possible, and ?? throw
for the null checks.AsList
, it only checks for List<T>
it does not convert and does not dispose.public async Task<int> LinkJob(int customerId, int jobId, int employeeId, List<TaskDto> tasks)
{
await using var conn = CreateNewConnection();
await conn.OpenAsync();
await using var transaction = conn.BeginTransactionAsync();
int customer, JobDto job;
await (using var checks = await conn.QueryMultipleAsync(@"
SELECT Id FROM Customer
WHERE Id = @customerId;
SELECT Id FROM Job WHERE Id = @jobId;
", new { customerId, jobId }, transaction))
{
customer = (await checks.ReadAsync<int?>()).SingleOrDefault()
?? throw new ServiceNotFoundException($"Customer {customerId} not found");
job = (await checks.ReadAsync<JobDto>()).SingleOrDefault()
?? throw new ServiceNotFoundException($"Job {jobId} not found");
}
var workItemId = (await conn.QueryAsync<int>(@"
INSERT WorkItem (CustomerId, JobId, JobDate)
OUTPUT inserted.Id
VALUES (@customerId, @job, GETDATE());
IF @CustomerId > 0
UPDATE Customer
SET LatestActionDate = GETDATE()
WHERE Id = @CustomerId;
", new { customerId, jobId }, transaction)).SingleOrDefault();
await _auditLogger.LogJob(jobId, "Job created", user);
for (int i = 0; i < tasks.Sum(o => o.ItemCount * o.Required); i++)
{
var taskId = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost)
OUTPUT inserted.Id
VALUES (@jobId, @Cost);
", new { jobId, job.Cost }, transaction));
// do something with taskId
}
await conn.ExecuteAsync(@"
//do another insert here
", new { jobId }, transaction.UnderlyingTransaction);
var tasksDT = tasks.Select(o => new { o.TaskTypeId, o.Required
}).ToDataTable();
var allocatedTasks = (await conn.QueryAsync<TaskDetailsDto>(@"
--Complex cte insert that returns some details to log
", new { taskAllocationckId, jobId, Tasks = tasksDT.AsTableValuedParameter("TasksUDT")
}, transaction.UnderlyingTransaction)).ToListAsync();
foreach(var task in allocatedTasks)
{
await _auditLogger.LogJob(jobId, $"{task.TaskTypeName} task added with {task.ItemCount} items");
}
await transaction.CommitAsync();
return workItemId;
}
You can also replace the entire taskId
loop with a single insert, using GENERATE_SERIES
or on older versions you can use a numbers function.
var taskIdList = (await conn.QueryAsync<int>(@"
INSERT Task (JobId, Cost)
OUTPUT inserted.Id
SELECT @jobId, @Cost
FROM GENERATE_SERIES(1, @Count);
", new { jobId, job.Cost, Count = tasks.Sum(o => o.ItemCount * o.Required) }, transaction)).ToListAsync();
// do something with taskId
It's hard to tell exactly given your stripped-down code, but you could probably do this entire set of commands in a single SQL batch.