Search code examples
windowsmultithreadingserviceworkerslots

C# - Worker thread with slots, items being dynamically added


I have window service that polls a web service for new items every 30 seconds. If it finds any new items, it checks to see if they need to be "processed" and then puts them in a list to process. I spawn off different threads to process 5 at a time, and when one finishes, another one will fill the empty slot. Once everything has finished, the program sleeps for 30 seconds and then polls again.

My issue is, while the items are being processed(which could take up to 15 minutes), new items are being created which also may need to be processed. My problem is the main thread gets held up waiting for every last thread to finish before it sleeps and starts the process all over.

What I'm looking to do is have the main thread continue to poll the web service every 30 seconds, however instead of getting held up, add any new items it finds to a list, which would be processed in a separate worker thread. In that worker thread, it would still have say only 5 slots available, but they would essentially always all be filled, assuming the main thread continues to find new items to process.

I hope that makes sense. Thanks!

EDIT: updated code sample

I put together this as a worker thread that operates on a ConcurrentQueue. Any way to improve this?

  private void ThreadWorker() {
  DateTime dtStart = DateTime.Now;
  int iNumOfConcurrentSlots = 6
  Thread[] threads = new Thread[iNumOfConcurrentSlots];

  while (true) {
    for (int i = 0; i < m_iNumOfConcurrentSlots; i++) {
      if (m_tAssetQueue.TryDequeue(out Asset aa)) {
        threads[i] = new Thread(() => ProcessAsset(aa));
        threads[i].Start();
        Thread.Sleep(500);
      }
    }
  }
}

EDIT: Ahh yeah that won't work above. I need a way of being able to not hard code the number of ConcurrentSlots, but have each thread basically waiting and looking for something in the Queue and if it finds it, process it. But then I also need a way of signalling that the ProcessAsset() function has completed to release the thread and allow another thread to be created....


Solution

  • One simple way to do it is to have 5 threads reading from a concurrent queue. The main thread queues items and the worker threads do blocking reads from the queue.

    Note: The workers are in an infinite loop. They call TryDequeue, process the item if they got one or sleep one second if they fail to get something. They can also check for an exit flag.

    To have your service property behaved, you might have an independent polling thread that queues the items. The main thread is kept to respond to start, stop, pause requests.

    Pseudo code for worker thread:

    While true
        If TryDequeue then 
            process data
    
        If exit flag is true, break
        While pause flag, sleep
        Sleep
    

    Pseudo code for polling thread:

    While true
        Poll web service
        Queue items in concurrent queue
        If exit flag true, break
        While pause flag, sleep
        Sleep
    

    Pseudo code for main thread:

    Start polling thread
    Start n worker threads with above code
    Handle stop:
        set exit flag to true
    Handle pause
        set pause flag to true