Search code examples
multithreadingdelphithreadpool

Implement multithreading with a queue in Delphi to limit to n threads running simultaneously?


I have been working with Delphi for some time, and am now trying to implement multi-threading in my code with the following requirements:

  1. Only 2 threads should run at a time.

  2. Other tasks should wait in a queue and execute once a thread becomes available.

I tried using TParallel.For to achieve this, but I couldn't implement the desired behavior. Here's my sample code:

    type
       TMyRecord = record
         msg: String;
         sleep: Integer;
          end;

    var
      Form2: TForm2;

    implementation

    {$R *.dfm}

    procedure TForm2.Button1Click(Sender: TObject);
    var
      record1, record2, record3, record4, record5: TMyRecord;
      MyList: TList<TMyRecord>;
    begin
      // Initialize records
      record1.msg := 'Item1';
      record1.sleep := 10;

      record2.msg := 'Item2';
      record2.sleep := 10;

      record3.msg := 'Item3';
      record3.sleep := 3000;

      record4.msg := 'Item4';
      record4.sleep := 3000;

      record5.msg := 'Item5';
      record5.sleep := 100;

     MyList := TList<TMyRecord>.Create;
      try
        // Add records to the list
        MyList.Add(record1);
        MyList.Add(record2);
        MyList.Add(record3);
        MyList.Add(record4);
        MyList.Add(record5);

    // Use TParallel.For
    TParallel.For(0, MyList.Count - 1,
      procedure (i: Integer)
      var
        ListItem: TMyRecord;
      begin
        ListItem := MyList[i];

        TThread.Synchronize(nil,
          procedure
          begin
            Memo1.Lines.Add(ListItem.msg);
          end);

        Sleep(ListItem.sleep);
      end);

      finally
    MyList.Free;
  end;
end;

I also tried using TTask.Run and ThreadPool, but was unable to achieve the desired behavior. Updated - here is what i tried to do using Threadpool

procedure TForm2.Button1Click(Sender: TObject);
var
  record1, record2, record3, record4, record5: TMyRecord;
  MyList: TList<TMyRecord>;
  ThreadPool: TThreadPool;
begin
  // Initialize records
  record1.msg := 'Item1';
  record1.sleep := 10;

  record2.msg := 'Item2';
  record2.sleep := 10;

  record3.msg := 'Item3';
  record3.sleep := 3000;

  record4.msg := 'Item4';
  record4.sleep := 3000;

  record5.msg := 'Item5';
  record5.sleep := 100;

  MyList := TList<TMyRecord>.Create;
  try
    // Add records to the list
    MyList.Add(record1);
    MyList.Add(record2);
    MyList.Add(record3);
    MyList.Add(record4);
    MyList.Add(record5);

    ThreadPool := TThreadPool.Create;
    ThreadPool.SetMaxWorkerThreads(2);

    // Iterate through the list and queue each work item with a copy of the record
    for var i := 0 to MyList.Count - 1 do
    begin
      var ListItem := MyList[i];

      ThreadPool.QueueWorkItem(
        procedure
        var
          LocalItem: TMyRecord;
        begin
          LocalItem := ListItem;

          TThread.Queue(nil,
            procedure
            begin
              Memo1.Lines.Add(LocalItem.msg);
              Sleep(LocalItem.sleep);  // Simulate the delay
            end);
        end);
    end;
  finally
    MyList.Free;
  end;
end;

there is problem with this code -

  1. it prints only 'item5' 5 times.
  2. waiting time(sleep) is not working.

How can I modify this code to limit the number of threads running simultaneously to 2? And queue other tasks to execute as threads become available?


Solution

  • it prints only 'item5' 5 times.

    That implies that your queued work items are capturing the same TMyRecord, and all of your prints are happening after that variable has been assigned the last item in the list.

    Even though your loop is using a local inline variable, given the way that variable capture in anonymous procedures works, it seems to be ignoring the inlining.

    The common solution to having an anonymous procedure capture a variable inside a loop is to move the anonymous procedure to a separate function with an input parameter that it can capture.

    waiting time(sleep) is not working.

    That is likely because you are doing the sleeps in the context of the main UI thread, not in the worker threads, so they are running at full speed.

    Try something more like this:

    function MakeWorkItem(Item: TMyRecord): TProc;
    begin
      Result := procedure
        begin
          TThread.Queue(nil,
            procedure
            begin
              Form2.Memo1.Lines.Add(Item.msg);
            end
          );
          Sleep(Item.sleep);  // Simulate the delay 
        end;
    end; 
    
    procedure TForm2.Button1Click(Sender: TObject);
    var
      record1, record2, record3, record4, record5: TMyRecord;
      MyList: TList<TMyRecord>;
      ThreadPool: TThreadPool;
    begin
      // Initialize records
      record1.msg := 'Item1';
      record1.sleep := 10;
    
      record2.msg := 'Item2';
      record2.sleep := 10;
    
      record3.msg := 'Item3';
      record3.sleep := 3000;
    
      record4.msg := 'Item4';
      record4.sleep := 3000;
    
      record5.msg := 'Item5';
      record5.sleep := 100;
    
      MyList := TList<TMyRecord>.Create;
      try
        // Add records to the list
        MyList.Add(record1);
        MyList.Add(record2);
        MyList.Add(record3);
        MyList.Add(record4);
        MyList.Add(record5);
    
        ThreadPool := TThreadPool.Create;
        ThreadPool.SetMaxWorkerThreads(2);
    
        // Iterate through the list and queue each work item with a copy of the record
        for var i := 0 to MyList.Count - 1 do
        begin
          ThreadPool.QueueWorkItem(MakeWorkItem(MyList[i]));
        end;
      finally
        MyList.Free;
      end;
    end;
    

    Alternatively, using TParallel.For, simply pass your desired TThreadPool to it as an extra parameter, eg:

    procedure TForm2.Button1Click(Sender: TObject);
    var
      record1, record2, record3, record4, record5: TMyRecord;
      MyList: TList<TMyRecord>;
      ThreadPool: TThreadPool;
    begin
      // Initialize records
      record1.msg := 'Item1';
      record1.sleep := 10;
    
      record2.msg := 'Item2';
      record2.sleep := 10;
    
      record3.msg := 'Item3';
      record3.sleep := 3000;
    
      record4.msg := 'Item4';
      record4.sleep := 3000;
    
      record5.msg := 'Item5';
      record5.sleep := 100;
    
      MyList := TList<TMyRecord>.Create;
      try
        // Add records to the list
        MyList.Add(record1);
        MyList.Add(record2);
        MyList.Add(record3);
        MyList.Add(record4);
        MyList.Add(record5);
    
        ThreadPool := TThreadPool.Create;
        ThreadPool.SetMaxWorkerThreads(2);
    
        // Use TParallel.For
        TParallel.For(0, MyList.Count - 1,
          procedure (i: Integer)
          var
            ListItem: TMyRecord;
          begin
            ListItem := MyList[i];
    
            TThread.Queue(nil,
              procedure
              begin
                Memo1.Lines.Add(ListItem.msg);
              end
            );
    
            Sleep(ListItem.sleep);
          end,
          TThreadPool // <-- HERE
        );
      finally
        MyList.Free;
      end;
    end;