I have a Delphi application where I'm using a thread pool to process XML data. Each task in the thread pool performs processing on XML records. If an exception is raised in one of the threads, I want to stop all remaining threads from starting and executing.
function ProcessXMLWrapper(lProcessXML: TProcessXML; lXMLData, String): TProc;
begin
Result := procedure
begin
CoInitialize(nil);
try
lCriticalSection.Enter;
try
if Form2.fbTerminate then
Exit; // Exit early if termination is flagged
finally
lCriticalSection.Leave;
end;
try
lProcessXML.Execute(lXMLData);
except
on E: Exception do
begin
Form2.fbTerminate := True; // Flag termination
end;
end;
finally
CoUninitialize;
end;
end;
end;
In my Button1Click method, I queue tasks in a thread pool as follows:
ThreadPool := TThreadPool.Create;
ThreadPool.SetMaxWorkerThreads(1);
try
for var i := 0 to MyList.Count - 1 do
begin
lCriticalSection.Enter;
try
if fbTerminate then
Break; // Stop queuing tasks if termination is flagged
finally
lCriticalSection.Leave;
end;
ThreadPool.QueueWorkItem(ProcessXMLWrapper(lProcessXML, MyList[i].msg));
end;
finally
ThreadPool.Free;
end;
Here’s what I want to achieve:
Currently, I’m using a TCriticalSection to synchronize access to shared data and checking a condition (if fbTerminate then Exit) in ProcessXMLWrapper. However, this doesn’t prevent threads already in the queue from executing.
My Question:
After some testing, I found a solution that works. If you notice any issues or have suggestions for improvement, please let me know.
unit Unit1;
interface
uses
Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls, System.Threading,
System.SyncObjs, System.Generics.Collections, System.Diagnostics;
type
TForm1 = class(TForm)
Memo1: TMemo;
Button1: TButton;
procedure Button1Click(Sender: TObject);
private
{ Private declarations }
fTerminate : Boolean;
public
{ Public declarations }
end;
function ProcessTask(const TaskId: Integer): TProc;
var
Form1: TForm1;
MyThreadPool: TThreadPool;
I: Integer;
Tasks: TArray<ITask>;
implementation
{$R *.dfm}
function ProcessTask(const TaskId: Integer): TProc;
begin
Result := procedure
var
lThreadId : TThreadId;
Begin
if Form1.fTerminate then
Begin
TThread.Queue(
TThread.Current,
procedure
Begin
Form1.Memo1.lines.add(Format('Skiping TaskId: %d ThreadId: %s',[TaskId, lThreadId.ToString]));
End
);
exit;
End;
try
lThreadId := TThread.Current.ThreadId;
TThread.Queue(
TThread.Current,
procedure
Begin
Form1.Memo1.lines.add(Format('Started TaskId: %d ThreadId: %s',[TaskId, lThreadId.ToString]));
End
);
{ Raise exception }
if TaskId=7 then
Begin
Form1.fTerminate := True;
exit;
End;
Sleep(2000); // Simulate work
TThread.Queue(
TThread.Current,
procedure
Begin
Form1.Memo1.Lines.Add(Format('TaskId %d completed. ThreadId: %s.',[TaskId, lThreadId.ToString]));
End
);
except
On E:Exception do
Form1.fTerminate := True;
end;
End;
end;
procedure TForm1.Button1Click(Sender: TObject);
begin
fTerminate := False;
MyThreadPool := TThreadPool.Create;
try
MyThreadPool.SetMaxWorkerThreads(1);
MyThreadPool.SetMinWorkerThreads(1);
// Hold 10 tasks
SetLength(Tasks,10);
for I := 0 to High(Tasks) do
Begin
if form1.fTerminate then
break;
sleep(2000);
Tasks[I] := TTask.Create(
ProcessTask(I+1),
MyThreadPool
);
// Start the task
Tasks[I].Start;
End;
for I := 0 to High(Tasks) do
Tasks[I].Wait;
finally
Form1.Memo1.Lines.Add(Format('All tasks completed.');
MyThreadPool.Free;
end;
end;
end.