Search code examples
multithreadingdelphi

Delphi - launching multiple threads, and combining their output


I am not new to Delphi, but new to using threads. I am working on a project that needs to query several servers for data, and I thought I would try implementing it using threads. The idea is to create a dedicated thread to send and process a query to each remote server, and then use the main program to combine and process the collected data returned from the threads. I found several examples that show how to launch multiple threads, and they seem to all do it using "for" loop. I implemented the same method in my project but was disappointed to see that this method still waits for each thread to finish before launching the next one. I would like to be able to launch all the threads at the same time, and wait for them to collect the requested data and return it, then process it. What would be the recommended way to do this? Here is the simplified code, illustrating the "not running simultaneously" problem:

unit threads2;

interface

uses
  Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
  Vcl.Controls, Vcl.Forms, Vcl.Dialogs, support, Vcl.StdCtrls, Vcl.ExtCtrls;

type
  TForm1 = class(TForm)
    Panel1: TPanel;
    Panel2: TPanel;
    Memo1: TMemo;
    Button1: TButton;
    procedure Button1Click(Sender: TObject);
    procedure FormCreate(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
  end;


  TMyThread = Class(TThread)
  private
     fnumber : longint;
     fLog:tstrings;
     fExecutionTime:TDateTime;
     fiLimit: longint;
  public
     constructor Create(createsuspended: boolean; iLimit:integer);
     procedure Execute(); override;
  end;


var
  Form1: TForm1;

implementation

{$R *.dfm}

constructor TMyThread.Create(createsuspended: boolean; ilimit:longint);
begin
  inherited Create(createsuspended);
  self.fLog := tstringlist.Create;
  self.fiLimit := ilimit;
  self.fExecutionTime := 0;
  self.execute;
end;



procedure Tmythread.Execute;
var i,rcode:longint;
begin
  self.fExecutionTime := now;
  self.flog.Add(AddDotsAndLeftJ(35,'.','Started thread '+inttostr(self.threadID)+' at ')+': ' + formatdatetime('h:nn:ss:zzz', now()));
  self.flog.Add(AddDotsAndLeftJ(35,'.','Limit: ')+': ' + inttostr(self.filimit));

  self.fnumber := 0;
  for I := 0 to self.fiLimit do
  begin
    self.fnumber:=self.fnumber + random(5);
  end;

  self.fExecutionTime := now()-self.fExecutionTime;
  self.flog.Add(AddDotsAndLeftJ(35,'.','Sum: ')+': ' + inttostr(self.fnumber));
  self.flog.Add(AddDotsAndLeftJ(35,'.','Finished thread '+inttostr(self.threadID)+' at ')+': ' + formatdatetime('h:nn:ss:zzz', now()));
  self.flog.Add(AddDotsAndLeftJ(35,'.','Thread execution time')+': ' + formatdatetime('h:nn:ss:zzz', self.fExecutionTime));
end;

procedure TForm1.Button1Click(Sender: TObject);
var n,i:longint;
    mt:tmythread;
    s:string;
begin
  for i := 0 to 5 do
  begin

    case i of
    0: n:=100000;
    1: n:=1000000;
    2: n:=10000000;
    3: n:=100000000;
    4: n:=10000000;
    5: n:=1000000000;
    end;


    mt:=TMyThread.Create(true,n);

    memo1.Lines.AddStrings(mt.fLog);
    memo1.Lines.Add('------------------------------------');

  end;
end;


end.


The output looks like this: enter image description here

The start and end times of each thread suggests that the threads are running strictly sequentially; the next thread always starts after the previous one finished



The second part of my question has to do with how the threads are managed, and how to collect the output of the returning threads. I created a dynamic array and setlength it to the number of the threads I am about to launch. Each element of the array is a record (threadDataRecord) that will be used to store the output of the associated thread. Each thread has a local variable that matches the structure of that data record, and it fills that variable while it's executing. Once the thread is finished, it copies the contents of the local record into the array element associated with the thread (i.e. Thread #4 will copy its data into the record at array[4] ). It does seem to work as intended, but the question is, is it safe to do so? While the array is a shared variable, no thread is writing to the same array element at the same time; if I have each thread write to a separate array element, is this a thread-safe way of accessing/modifying the array?

Any pointers are greatly appreciated! Thank you


Solution

  • You should never manually call your threads Execute method!

    And that is exactly your problem. Since you are calling your thread Execute method from within constructor it is executed by your application main thread the same as constructor was.


    I see you are creating your threads in suspended state. If you do so then you also needs to make sure to start the said thread. You do this by calling TThread.Start after the creation of the thread.

    mt:=TMyThread.Create(true,n);
    mt.Start;  
    

    This then automatically executes code within threads Execute method. And it will be executed by the said thread unless you are making use of Synchronize somewhere within that code. That would force code within Synchronize call to be executed on main thread.


    EDIT: Here is an example of using multithreading for processing some data. The example uses OnTerminate event to notify the main thread when execution of specific worker thread has finished. Example also implements custom notification event that can be used for notifying of current work progress for each thread (optional).

    I'm also providing detailed in code comments in order to provide better understanding of how this code works.

    I'm using a different data structure but I'm sure you will be able to adapt this to your needs.

    unit Unit1;
    
    interface
    
    uses
      Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
      Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, System.Math, System.DateUtils, System.TimeSpan;
    
    type
      //Enumeration type for representing internal thread state
      //This can be usefull to makr different stages of thread processing
      TInternalThreadState = (Start, RandomSumation, RandomSubstraction, Finish);
    
      //Class for stroing work data
      TWorkData = class(TObject)
        //Initial data prepared before work is assigned to any thead
        NumberOfCalculations: Integer;
        WorkNo: Integer;
        //Return data filled by the thread the work was assigned for
        startTime: TDateTime;
        endTime: TDateTime;
        ThreadID: Integer;
        Log: TStringList;
      public
        //We use constructor for creating additional internal objects like Log at creation of work class
        constructor Create;
        //Using destructor we free up all internal created objects on destruction of the work class itself
        destructor Destroy; override;
      end;
    
      //Definition for special procedure type that we will use as progress notification event method
      TThreadProgressNotification = procedure(Sender: TObject; PercentageDone: Integer; WorkState: TInternalThreadState) of object;
    
      //Type deffinition of custom thread class
      TMyWorkerThread = class(TThread)
      private
        //Field stroing reference to progress event notification method
        fOnProgress: TThreadProgressNotification;
        //Reference to WorkData
        fWorkData: TWorkData;
        //Index at which thread is stored in thread list
        fThreadIndex: Integer;
        //Internal string list that we use for storing thread log
        fThreadLog: TStringList;
        //Custon internal thread state that we can use to mark specific stages of thread processing
        fThreadInternalState: TInternalThreadState;
        //Number of calculation that this threads was assigned to perform
        fNumberOfCalculations: Integer;
      protected
        procedure Execute; override;
        //Method that will be fired when the thread finishes its execution
        //We use this method to notify main thead that worker thread has finished its work
        //and also return any processed data
        procedure OnTerminateEvent(Sender: TObject);
      public
        //Custom constructor for our woorker thread that allows passing some additional parameters
        constructor Create(CreateSuspended: Boolean; ThreadIndex: Integer; ThreadData: TWorkData);
        //Destructor that we wil use for cleaing up ouw worker thread data
        destructor Destroy; override;
        //Property with which we can optionaly assing event method for reporting current thread progress
        property OnProgress: TThreadProgressNotification read fOnProgress write fOnProgress;
      end;
    
      TForm1 = class(TForm)
        Memo1: TMemo;
        Button1: TButton;
        Memo2: TMemo;
        procedure FormCreate(Sender: TObject);
        procedure Button1Click(Sender: TObject);
        procedure FormDestroy(Sender: TObject);
      private
        { Private declarations }
        //Actual event method for reporting worker thread progress
        procedure OnThreadProgress(Sender: TObject; PercentageDone: Integer; WorkState: TInternalThreadState);
      public
        { Public declarations }
      end;
    
    var
      Form1: TForm1;
      //Array holding work data
      ArrWorkdata: Array of TWorkData;
      //Array of worker threads
      ArrThreads: Array of TMyWorkerThread;
    
    implementation
    
    {$R *.dfm}
    
    { TMyThread }
    
    constructor TMyWorkerThread.Create(CreateSuspended: Boolean; ThreadIndex: Integer; ThreadData: TWorkData);
    begin
      //Call inherited constructor of the base TThread class passing value of CreateSuspended argument
      //Since we are using custom constructor format we need to manually specify the format of base class constructor
      inherited Create(CreateSuspended);
      //We set FreeOnTerminate to false in order to prevent thread of automaticaly freing its data after finishing work
      FreeOnTerminate := False;
      //Assigning work data reference to our work tread.
      fWorkData := ThreadData;
      //Assigning index at which thread is stored in our thread list
      fThreadIndex := ThreadIndex;
      //Setting initial internal thread state
      fThreadInternalState := TInternalThreadState.Start;
      //Setting hte number of calculation that this thread will work on
      fNumberOfcalculations := fWorkData.NumberOfcalculations;
      //Creating the internal thread log
      fThreadLog := TStringList.Create;
      //Writing first entry into thread log marking its start
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Thread started at: '+DateTimeToStr(now));
      //Log the start of work in linked WorkData
      self.fWorkData.startTime := Now;
    end;
    
    destructor TMyWorkerThread.Destroy;
    begin
      //Free the internal thread log upon thread destruction
      fThreadLog.Free;
      inherited;
    end;
    
    procedure TMyWorkerThread.Execute;
    var I: Integer;
        N: Int64;
        PercentageDone: Integer;
    begin
      //Random sumation - internal stage of work thread
      fThreadInternalState := TInternalThreadState.RandomSumation;
      //Log start of work stage
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Random sumation started at: '+DateTimeToStr(now));
      //Do some work
      for I := 0 to fNumberOfCalculations do
      begin
        //After processing every N number of items we report progress to the main thread if
        //OnProgress event method has been assigned
        //We do this using Synchronize that forces the code to be executed on main thread
        //This allows us to safely acces any UI elements
        //We should not fire this to often or we will overlaod main thread
        if (I>0) and  ((I mod 100000) = 0) then
        begin
          Synchronize(procedure
          begin
            //Check if OnProgress event method has been asigned
            if Assigned(OnProgress) then
            begin
              //Calculate percentage of the work done on current work stage
              PercentageDone := Round(I / fNumberOfCalculations *100);
              //Call OnProgress event method passing relevant parameters
              OnProgress(Self,PercentageDone,Self.fThreadInternalState);
            end;
          end)
        end;
        N := N+Random(1000);
      end;
      //Log end of work stage
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Random sumation finished at: '+DateTimeToStr(now));
    
      //Random substraction
      fThreadInternalState := TInternalThreadState.RandomSubstraction;
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Random substraction started at: '+DateTimeToStr(now));
      for I := 0 to fNumberOfCalculations do
      begin
        if (I>0) and  ((I mod 100000) = 0) then
        begin
          Synchronize(procedure
          begin
            if Assigned(OnProgress) then
            begin
              PercentageDone := Round(I / fNumberOfCalculations *100);
              OnProgress(Self,PercentageDone,Self.fThreadInternalState);
            end;
          end)
        end;
        N := N-Random(1000);
      end;
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Random substraction finished at: '+DateTimeToStr(now));
      fThreadInternalState := TInternalThreadState.Finish;
      //Log entire work done
      fThreadLog.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Thread finished at: '+DateTimeToStr(now));
    end;
    
    procedure TMyWorkerThread.OnTerminateEvent(Sender: TObject);
    var ElapsedTime: Int64;
    begin
      //Log the end of work in linked WorkData
      self.fWorkData.endTime := Now;
      //Notify main thread that work thread has been destroyed
      Form1.Memo1.Lines.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Thread finished at: '+DateTimeToStr(now));
      ElapsedTime := System.DateUtils.MilliSecondsBetween(self.fWorkData.startTime,self.fWorkData.endTime);
      //Notify of how long thread execution was
      Form1.Memo1.Lines.Add('ThreadNo: '+IntToStr(fThreadIndex)+' ThreadID: '+IntToStr(self.ThreadID)+' Thread finished in: '+IntToStr(ElapsedTime)+' ms');
      //Log thread destruction in linked WorkData
      self.fWorkData.Log.AddStrings(self.fThreadLog);
    end;
    
    procedure TForm1.Button1Click(Sender: TObject);
    var I: Integer;
    begin
      Memo1.Lines.Clear;
      Memo2.Lines.Clear;
      //Create the Workdata classes
      for I := 0 to Length(ArrWorkdata)-1 do
      begin
        ArrWorkdata[I] := TWorkData.Create;
        ArrWorkdata[I].WorkNo := I;
        //Assign randon number of calculations from within specific ragne that represent this work
        //Thisis done to represent different workloads
        ArrWorkdata[I].NumberOfCalculations := RandomRange(1000000,10000000);
      end;
      //Create our worker threads
      for I := 0 to Length(arrThreads)-1 do
      begin
        arrThreads[I] := TMyWorkerThread.Create(True,I,ArrWorkdata[I]);
        //Assign reference to event method that will be used  to notify of thread termination and
        //can also preform some data transfer from thread to WorkObject
        arrThreads[I].OnTerminate := arrThreads[I].OnTerminateEvent;
        //Optionally assing OnProgres event method that will be used to notify the main thread about
        //current progres of specifc thread
        arrThreads[I].OnProgress := OnThreadProgress;
        Memo2.Lines.Add('ThreadNo: '+IntToStr(I)+' ThreadID: '+IntToStr(arrThreads[I].ThreadID)+' Thread started at: '+DateTimeToStr(now));
        //Start the woorker thread
        arrThreads[I].Start;
      end;
    end;
    
    procedure TForm1.FormCreate(Sender: TObject);
    begin
      //We need to call Randomize at least once before using any random function
      Randomize;
      SetLength(ArrWorkdata,8);
      SetLength(ArrThreads,8);
    end;
    
    procedure TForm1.FormDestroy(Sender: TObject);
    var I: Integer;
    begin
      for I := 0 to Length(ArrWorkdata)-1 do
      begin
        ArrWorkdata[I].Free;
      end;
      for I := 0 to Length(ArrThreads)-1 do
      begin
        ArrThreads[I].Free;
      end;
    end;
    
    procedure TForm1.OnThreadProgress(Sender: TObject; PercentageDone: Integer; WorkState: TInternalThreadState);
    begin
      //Visualy prepresent work progress as log
      Memo1.Lines.Add('ThreadNo: '+IntToStr(TMyWorkerThread(Sender).fThreadIndex)+' ThreadID: '+IntToStr(TMyWorkerThread(Sender).ThreadID)+' Work state'+IntToStr(Ord(WorkState))+' Work done: '+IntToStr(PercentageDone));
      //Visualy represent progress of specific thread
      Memo2.Lines[TMyWorkerThread(Sender).fThreadIndex] := 'ThreadNo: '+IntToStr(TMyWorkerThread(Sender).fThreadIndex)+' ThreadID: '+IntToStr(TMyWorkerThread(Sender).ThreadID)+' Work state'+IntToStr(Ord(WorkState))+' Work done: '+IntToStr(PercentageDone);
    end;
    
    { TThreadData }
    
    constructor TWorkData.Create;
    begin
      //Create a string list that will store log for this WorkData
      self.Log := TStringList.Create;
    end;
    
    destructor TWorkData.Destroy;
    begin
      //Free the WorkData log
      self.Log.Free;
      inherited;
    end;
    
    end.