Search code examples
databasedelphidelphi-xe2accuracerdb

Database Record Processing


How can I split the workload of records across multiple threads specifically Accuracer DB that has 169 records with 7 threads for example.

Because I could just split the number of records in ranges and let each thread process the range. But if user deletes or adds record it will not work good.


Solution

  • You can use OmniThreadLibrary to process records from a database in parallel without much hassle.

    I wrote an example using the Pipeline abstraction. The pipeline consts of 3 stages:

    1. The first stage reads data from the database, creates a instance of the container object to represent that data for the next stage of the pipeline.
    2. The second stage processes the incoming data.

      • calls the DoSomethingWith procedure that simply wastes around 100 ms. to simulate the processing of the data
      • frees the memory of the container instance.
      • Then adds the literal value 1 to the output queue to inform the final stage that another record has been processed.

      This stage is configured to run in parallel in 7 threads.

    3. The last stage just counts how many records has been completed from the previous stage

    The example is a console application to allow you just copy/paste to see it working live in your machine.

    program Project1;
    
    {$APPTYPE CONSOLE}
    
    {$R *.res}
    
    uses
      System.SysUtils,
      OtlCommon,
      OtlCollections,
      OtlParallel,
      System.Diagnostics,
      DB, DBClient;
    
    type
      //auxiliar container, used to copy the database data
      //to avoid synchronization. remember TDataSet "current record"
      //may cause conflicts if changed from different threads.
      TContainer = class
      private
        FName: string;
        FID: Int64;
      public
        property ID: Int64 read FID write FID;
        property Name: string read FName write FName;
      end;
    
    //does nothing, but wastes around 100ms. "processing" each record
    procedure DoSomethingWith(const AValue: TContainer);
    begin
      Sleep(100);
    end;
    
    //creates a DataSet on the fly with a random number of records
    function CreateDataSet: TClientDataSet;
    var
      I: Integer;
    begin
      Result := TClientDataSet.Create(nil);
      with Result.FieldDefs.AddFieldDef do
      begin
        Name := 'ID';
        DataType := ftLargeint;
      end;
      with Result.FieldDefs.AddFieldDef do
      begin
        Name := 'NAME';
        DataType := ftString;
      end;
      Result.CreateDataSet;
      for I := 1 to Random(1000) do
        Result.InsertRecord([I, 'Test']);
    end;
    
    var
      RecordsProcessed: Integer;
      SW: TStopwatch;
      Data: TDataSet;
    begin
      IsMultiThread := True;
      Randomize;
      Writeln('wait while processing...');
      SW := TStopwatch.Create;
      SW.Start;
      try
        Data := CreateDataSet;
        try
          RecordsProcessed := Parallel.Pipeline
            .Stage(
              procedure (const Input, Output: IOmniBlockingCollection)
              var
                RecData: TContainer;
              begin
                Data.First;
                while not Data.Eof do
                begin
                  RecData := TContainer.Create;
                  RecData.ID := Data.Fields[0].AsLargeInt;
                  RecData.Name := Data.Fields[1].AsString;
                  Output.Add(RecData);
                  Data.Next;
                end;
              end)
            .Stage(
              procedure (const Input: TOmniValue; var Output: TOmniValue)
              begin
                //process the real thing here
                DoSomethingWith(Input);
                Input.AsObject.Free;
                Output := 1; //another record
              end)
            .NumTasks(7) //this stage is processed by 7 parallel tasks
            .Stage(
               procedure (const Input, Output: IOmniBlockingCollection)
               var
                 Recs: Integer;
                 Value: TOmniValue;
               begin
                 Recs := 0;
                 for Value in Input do
                   Inc(Recs, Value);
                 Output.Add(Recs);
               end)
            .Run.Output.Next;
          SW.Stop;
          Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
          Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
        finally
          Data.Free;
        end;
      except
        on E: Exception do
          Writeln(E.ClassName, ': ', E.Message);
      end;
      readln;
    end.
    

    The main advantages of doing it this way, IMHO, are:

    • you have a flexible mechanism to distribute the job between the multiple workers. If some record takes more time to process, the library takes care of the situation and you can reasonably expect to finish the total work in the less possible time.
    • You'r first processing thread starts as soon as you finish reading the first record from the database.
    • You can easily adapt it if you have to wait for more incoming records in the base table. The output queue of the stage will not be marked as finished until the code in the stage procedure ends. If at some time there's no more work to do, all the upcoming stages would just block waiting for more data to process.
    • You change the number of worker threads just by changing a parameter value!