Search code examples
c#multithreadingparallel-processingtask-parallel-libraryconcurrent-queue

How to ensure parallel tasks dequeue unique entries from ConcurrentQueue<T>?


Hi I have a concurrent Queue that is loaded with files from database. These files are to be processed by parallel Tasks that will dequeue the files. However I run into issues where after some time, I start getting tasks that dequeue the same file at the same time (which leads to "used by another process errors on the file). And I also get more tasks than are supposed to be allocated. I have even seen 8 tasks running at once which should not be happening. The active tasks limit is 5

Rough code:

private void ParseQueuedTDXFiles()
{
    while (_signalParseQueuedFilesEvent.WaitOne())
    {
        Task.Run(() => SetParsersTask());
    }
}

The _signalParseQueuedFilesEvent is set on a timer in a Windows Service The above function then calls SetParsersTask. This is why I use a concurrent Dictionary to track how many active tasks there are. And make sure they are below _ActiveTasksLimit:

private void SetParsersTask()
{
    
    if (_ConcurrentqueuedTdxFilesToParse.Count > 0)
    {
        if (_activeParserTasksDict.Count < _ActiveTasksLimit) //ConcurrentTask Dictionary Used to control how many Tasks should run
        {
            int parserCountToStart = _ActiveTasksLimit - _activeParserTasksDict.Count;
            Parallel.For(0, parserCountToStart, parserToStart =>
            {
                lock(_concurrentQueueLock)
                    Task.Run(() => PrepTdxParser());
            });
        }
    }

}

Which then calls this function which dequeues the Concurrent Queue:

private void PrepTdxParser()
{
    TdxFileToProcessData fileToProcess;
    lock (_concurrentQueueLock)
        _ConcurrentqueuedTdxFilesToParse.TryDequeue(out  fileToProcess);
    if (!string.IsNullOrEmpty(fileToProcess.TdxFileName))
    {
        LaunchTDXParser(fileToProcess);
    }
}

I even put a lock on _ConcurrentqueuedTdxFilesToParse even though I know it doesn't need one. All to make sure that I never run into a situation where two Tasks are dequeuing the same file.

This function is where I add and remove Tasks as well as launch the file parser for the dequeued file:

private void LaunchTDXParser(TdxFileToProcessData fileToProcess)
{
    string fileName = fileToProcess.TdxFileName;
    Task startParserTask = new Task(() => ConfigureAndStartProcess(fileName));
    _activeParserTasksDict.TryAdd(fileName, startParserTask);
    startParserTask.Start();
    Task.WaitAll(startParserTask);
    _activeParserTasksDict.TryRemove(fileName, out Task taskToBeRemoved);
}

Can you guys help me understand why I am getting the same file dequeued in two different Tasks? And why I am getting more Tasks than the _ActiveTasksLimit?


Solution

  • So I fixed my problem. The solution was first to not add more parallelism than needs be. I was trying to create a situaion where private void SetParsersTask() would not be held by tasks that still needed to finish process a file. So I foolishly threw in Parallel.For in addition to Task.Start which is already parallel. I fixed this by generating Fire and Forget Tasks in a normal for loop as opposed to Paralle.For:

    private void SetParsersTask()
    {
        if (_queuedTdxFilesToParse.Count > 0)
        {
            if (_activeParserTasksDict.Count < _tdxParsersInstanceCount)
            {
                int parserCountToStart = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
                _queuedTdxFilesToParse = new ConcurrentQueue<TdxFileToProcessData>(_queuedTdxFilesToParse.Distinct());
                for (int i = 0; i < parserCountToStart; i++)
                {
                    Task.Run(() => PrepTdxParser());
                }
                
            }
        }
    
    }
    

    After that I was still getting the occasional duplicate files so I moved the queue loading to another long running thread. And for that thread I use an AutoResetEvent so that the queue is only populated only once at any instance of time. As opposed to potentially another task loading it with duplicate files. It could be that both my enqueue and dequeue were both responsible and now it's addressed:

    var _loadQueueTask = Task.Factory.StartNew(() => LoadQueue(), TaskCreationOptions.LongRunning);
    
    private void LoadQueue()
    {
        while (_loadConcurrentQueueEvent.WaitOne())
        {
            if (_queuedTdxFilesToParse.Count < _tdxParsersInstanceCount)
            {
                int numFilesToGet = _tdxParsersInstanceCount - _activeParserTasksDict.Count;
                var filesToAdd = ServiceDBHelper.GetTdxFilesToEnqueueForProcessingFromDB(numFilesToGet);
                foreach (var fileToProc in filesToAdd)
                {
                    ServiceDBHelper.UpdateTdxFileToProcessStatusAndUpdateDateTime(fileToProc.TdxFileName, 1, DateTime.Now);
                    _queuedTdxFilesToParse.Enqueue(fileToProc);
                }
    
            }
        }
    }
    

    Thanks to Theo for pointing me to additional tools and making me look closer in my parallel loops