Search code examples
c#.netparallel-processing.net-8.0parallel.foreach

Cannot parallelize my code using parallel library


I'm having a lot of trouble trying to parallelize a code I had. Basically, this code introduces two new records into DB.

If the records you enter already exists, and are in error state, both are entered after sending the old one to historical. If these records aren´t in error state, they are discarded and, if these records doesnt exists previously, they are introduced directly.

To parallelize this code, I tried to fixed it creating a DBContext and a UnitOfWork per thread. But I still keeping find the same error:

A second operation was started on this context instance before a previous operation completed. This is usually caused by different threads concurrently using the same instance of DbContext. 

I can't see the problem, even more the solution, can you help me, please?

ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 2;

    Parallel.ForEach(excelRawList.SuccessRows, options, (excelRow) =>
    {
        using (var dbContext = new MyContext(
        new DbContextOptionsBuilder<MyContext>()
    .UseSqlServer(_configuration.GetConnectionString("MyDB"))
            .Options))
        {
            using (var unitOfWork = new UnitOfWork(dbContext))
            {
                try
                {
                    int? canBeInserted = await service3.CanBeInserted(excelRow.Id);
    
                    // canBeIserted could be 3 values: null (there is not previous record), 0 (there is a previous record and it is not an error),
                    // Id_value (there is a previous record but it was an error)
    
                    if (canBeInserted != 0)
                    {
                        if (!excelRow.Prorroga)
                        {
                            using (var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(_timeoutTransaccion)))
                            {
                                unitOfWork.BeginTransaction(cts);
    
                                if (canBeInserted != null)
                                {
                                    service4.MoveToHistoric(unitOfWork, canBeInserted.Value, cts);
                                }
    
                                Entity2 entity2 = service3.Entity1ToEntity2(excelRow);
                                excelRow.entity2 = entity2;
    
                                unitOfWork.Context.Set<Entity2>().Add(entity2);
                                unitOfWork.Context.Set<Entity1>().Add(excelRow);
    
                                unitOfWork.SaveChanges(cts);
                                unitOfWork.CommitTransaction(cts);
    
                                if (entity2.Age == null)
                                {
                                   _logger.LogWarning("Age didnt find it");
                                }
                            }
                        }
                        catch (OperationCanceledException ex)
                        {
                            unitOfWork.Rollback();
                        }
                        catch (Exception ex)
                        {
                            unitOfWork.Rollback();
                        }
                  }
                  else
                  {
                       excelRow.FK2= null;
                       _service1.Add(excelRow);
                  }
             }
             else
             {
                   excelRawList.ErrorRows.Add(new ExcelError() { 
             }
          }
          catch (Exception ex)
          {
              excelRawList.ErrorRows.Add(new ExcelError()
              {
                   Message = ex.InnerException.Message
              });
          }
        }
      }
    });

    public void MoveToHistoric(UnitOfWork unitOfWork, int Id, CancellationTokenSource cts)
    {
        try
        {
            Entity1 entity1 = service1.Get(id);
            Entity2 entity2 = service2.Get(entity1.FK);
            Entity3 entity3 = _mapper.Map<Entity3>(entity1);
            Entity4 entity4 = _mapper.Map<Entity4>(entity2);
    
            unitOfWork.Context.Set<Entity1>().Remove(entity1);
            unitOfWork.Context.Set<Entity2>().Remove(entity2);
    
            await unitOfWork.SaveChangesAsyncLimite(cts);
    
            entity3.Entity4 = entity4;
            unitOfWork.Context.Set<Entity4>().Add(entity4);
            unitOfWork.Context.Set<Entity3>().Add(entity3);
    
            unitOfWork.SaveChanges(cts);
    
            return unitOfWork;
        }
        catch (Exception ex)
        {
            throw;
        }
    }

Many Thanks


Solution

  • My guess would be that await service3.CanBeInserted(excelRow.Id), or one of the other service classes, look something like this:

    public class Service3{
        MyContext dbContext;
        IUnitOfWork unitOfWork;
        public Service3(string connectionString){
           dbContext = new MyContext(new DbContextOptionsBuilder<MyContext>()
                  .UseSqlServer(_configuration.GetConnectionString(connectionString))
                  .Options);
           unitOfWork = new UnitOfWork(dbContext);
       }
       public int? CanBeInserted(int id){
       ...
       }
    }
    

    This will cause issues when used in a parallel loop since it will allow multiple calls to be done in parallel on the same db context, exactly as your error message says.

    The approach I'm most used to is to create a unit of work object per call in any service classes, i.e.

    public class Service3{
        IUnitOfWorkFactory unitFactory;
        public Service3(IUnitOfWorkFactory unitFactory) => this.unitFactory = unitFactory;
       public int? CanBeInserted(int id){
           using(var unitOfWork = unitFactory.Create()){
               ...
           }
       }
    }
    

    Where the IUnitOfWorkFactory.Create is responsible for creating a new DbContext and UnitOfWork object.

    Another approach that might be more suitable for your specific use case would be to take the unit of work as a parameter:

    public class Service3{
       public int? CanBeInserted(UnitOfWork unitOfWork, int id){
           ...
       }
    }
    

    Both options should remove the possibility of a db context being used concurrently from multiple threads.