Search code examples
multithreadingdelphirecursionthreadpoolomnithreadlibrary

OmniThreadLibrary: How to detect when all recursively scheduled (=pooled) threads have completed?


Let's say I have to recursively iterate over items stored in a tree structure in the background and I want to walk this tree using multiple threads from a thread pool (one thread per "folder" node). I have already managed to implement this using several different low and high-level approaches provided by the OmniThreadLibrary.

However, what I haven't figured out yet is how to properly detect that the scan has actually completed, i.e. that every last leaf node has been processed.

I found various examples on the net that either checked whether GlobalThreadPool.CountExecuting + GlobalThreadPool.CountQueued <= 0 or that used a IOmniTaskGroup.WaitForAll(). Unfortunately, none of these approaches appears to work for me. The check always returns True too early, i.e. when there still are some tasks running. None of the examples I looked at used recursion though - and those that did did not use a thread pool - is this maybe just not a good combination in the first place?

Here's a (very) simplified example code snippet of how I'm trying to do this at the moment:

procedure CreateScanFolderTask(const AFolder: IFolder);
begin
  CreateTask(ScanFolder)
    .SetParameter('Folder', AFolder)
    .Schedule();
end;

procedure ScanFolder(const ATask: IOmniTask);
var
  lFolder,
  lCurrentFolder: IFolder;
begin
  if ATaks.CancellationToken.IsSignalled then Exit;

  lCurrentFolder := ATask.Param['Folder'].AsInterface as IFolder;

  DoSomethingWithItemsInFolder(lCurrentFolder.Items);

  for lFolder in lCurrentFolder.Folders do
    begin
      if ATaks.CancellationToken.IsSignalled then Exit;
      CreateScanFolderTask(lFolder);
    end;
end;

begin
  GlobalOmniThreadPool.MaxExecuting := 8;
  CreateScanFolderTask(FRootFolder);

  // ??? wait for recursive scan to finish

  OutputResult();
end.

One example implementation for the wait that I have tried was this (based on an example found on About.com):

  while GlobalOmniThreadPool.CountExecuting + GlobalOmniThreadPool.CountQueued > 0 do
    Application.ProcessMessages;

But this appears to always exit immediately right after the "root thread" has finished. Even when I add an artificial delay using Sleep()-calls it still always exits too early. It seems that there occurs a "gap" between one task being struck off the list of executing tasks and the ones that were scheduled inside that task to be added to the list of queued tasks...

Actually, instead of waiting for the scan to finish, I would very much prefer to use an event handler (also, I'd rather not use Application.ProcessMessages as I will need this in form-less applications, too) and I already did try with both IOmniTaskControl.OnTerminated and using a TOmniEventMonitor but as these fire for every finished task I still somehow need to check whether the current one was the last one which again boils down to the same problem as above.

Or is there maybe a better way to create the tasks that would avoid this problem?


Solution

  • A simple way is to count 'folders to be processed' by yourself. Increment a value every time you create a folder task and decrement it every time a folder is processed.

    var
      counter: TOmniCounter;
    
    counter.Value := 0;
    
    procedure ScanFolder(const ATask: IOmniTask);
    var
      lFolder,
      lCurrentFolder: IFolder;
    begin
      if ATaks.CancellationToken.IsSignalled then Exit;
    
      lCurrentFolder := ATask.Param['Folder'].AsInterface as IFolder;
    
      DoSomethingWithItemsInFolder(lCurrentFolder.Items);
    
      for lFolder in lCurrentFolder.Folders do
        begin
          if ATaks.CancellationToken.IsSignalled then Exit;
          counter.Increment;
          CreateScanFolderTask(lFolder);
        end;
      counter.Decrement;
    end;