How can I split the workload of records across multiple threads specifically Accuracer DB that has 169 records with 7 threads for example.
Because I could just split the number of records in ranges and let each thread process the range. But if user deletes or adds record it will not work good.
You can use OmniThreadLibrary to process records from a database in parallel without much hassle.
I wrote an example using the Pipeline abstraction. The pipeline consts of 3 stages:
The second stage processes the incoming data.
DoSomethingWith
procedure that simply wastes around 100 ms. to simulate the processing of the data 1
to the output queue to inform the final stage that another record has been processed. This stage is configured to run in parallel in 7 threads.
The example is a console application to allow you just copy/paste to see it working live in your machine.
program Project1;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
OtlCommon,
OtlCollections,
OtlParallel,
System.Diagnostics,
DB, DBClient;
type
//auxiliar container, used to copy the database data
//to avoid synchronization. remember TDataSet "current record"
//may cause conflicts if changed from different threads.
TContainer = class
private
FName: string;
FID: Int64;
public
property ID: Int64 read FID write FID;
property Name: string read FName write FName;
end;
//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
Sleep(100);
end;
//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
I: Integer;
begin
Result := TClientDataSet.Create(nil);
with Result.FieldDefs.AddFieldDef do
begin
Name := 'ID';
DataType := ftLargeint;
end;
with Result.FieldDefs.AddFieldDef do
begin
Name := 'NAME';
DataType := ftString;
end;
Result.CreateDataSet;
for I := 1 to Random(1000) do
Result.InsertRecord([I, 'Test']);
end;
var
RecordsProcessed: Integer;
SW: TStopwatch;
Data: TDataSet;
begin
IsMultiThread := True;
Randomize;
Writeln('wait while processing...');
SW := TStopwatch.Create;
SW.Start;
try
Data := CreateDataSet;
try
RecordsProcessed := Parallel.Pipeline
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
RecData: TContainer;
begin
Data.First;
while not Data.Eof do
begin
RecData := TContainer.Create;
RecData.ID := Data.Fields[0].AsLargeInt;
RecData.Name := Data.Fields[1].AsString;
Output.Add(RecData);
Data.Next;
end;
end)
.Stage(
procedure (const Input: TOmniValue; var Output: TOmniValue)
begin
//process the real thing here
DoSomethingWith(Input);
Input.AsObject.Free;
Output := 1; //another record
end)
.NumTasks(7) //this stage is processed by 7 parallel tasks
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
Recs: Integer;
Value: TOmniValue;
begin
Recs := 0;
for Value in Input do
Inc(Recs, Value);
Output.Add(Recs);
end)
.Run.Output.Next;
SW.Stop;
Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
finally
Data.Free;
end;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
readln;
end.
The main advantages of doing it this way, IMHO, are: