Search code examples
multithreadingdelphiomnithreadlibrary

How do I design a "conveyor" of operations with OmniThreadLibrary?


I have a Windows Delphi application with "Start" and "Stop" menu items accessible via a notification icon. After click on "Start", I need to do the following (as I see implementation):

  1. ThreadMonitor: The first thread is waiting for the appearance of the specified file in the specified folder.

  2. ThreadParse: Once the file appears, it should be transferred to another thread (for parsing content) and continue monitoring for the next file.

  3. ThreadDB: Once all data are parsed, save them into MySQL DB. (Another background thread with active DB connection?)

  4. ThreadLog: If there are any errors in the steps 1–3, write them to a log file (another background thread?) without interrupting the steps 1–3.

That is, it turns out that something like a continuous conveyor, whose work is stopped only by pressing Stop. What should I use from a whole variety of methods of OmniThreadLibrary?


Solution

  • It would probably be best to use Parallel.BackgroundWorker for logging and Parallel.Pipeline for data processing. Here's a sketch of a solution (compiles, but is not fully implemented):

    unit PipelineDemo1;
    
    interface
    
    uses
      Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants, System.Classes, Vcl.Graphics,
      Vcl.Controls, Vcl.Forms, Vcl.Dialogs, Vcl.StdCtrls,
      OtlCommon, OtlCollections, OtlParallel;
    
    type
      TfrmPipelineDemo = class(TForm)
        btnStart: TButton;
        btnStop: TButton;
        procedure btnStartClick(Sender: TObject);
        procedure btnStopClick(Sender: TObject);
      private
        FLogger  : IOmniBackgroundWorker;
        FPipeline: IOmniPipeline;
      strict protected //asynchronous workers
        procedure Asy_LogMessage(const workItem: IOmniWorkItem);
        procedure Asy_Monitor(const input, output: IOmniBlockingCollection);
        procedure Asy_Parser(const input: TOmniValue; var output: TOmniValue);
        procedure Asy_SQL(const input, output: IOmniBlockingCollection);
      public
      end;
    
    var
      frmPipelineDemo: TfrmPipelineDemo;
    
    implementation
    
    uses
      OtlTask;
    
    {$R *.dfm}
    
    procedure TfrmPipelineDemo.Asy_LogMessage(const workItem: IOmniWorkItem);
    begin
      //log workItem.Data
    end;
    
    procedure TfrmPipelineDemo.Asy_Monitor(const input, output: IOmniBlockingCollection);
    begin
      while not input.IsCompleted do begin
        if FileExists('0.0') then
          output.TryAdd('0.0');
        Sleep(1000);
      end;
    end;
    
    procedure TfrmPipelineDemo.Asy_Parser(const input: TOmniValue; var output: TOmniValue);
    begin
      // output := ParseFile(input)
      FLogger.Schedule(FLogger.CreateWorkItem('File processed: ' + input.AsString));
    end;
    
    procedure TfrmPipelineDemo.Asy_SQL(const input, output: IOmniBlockingCollection);
    var
      value: TOmniValue;
    begin
      //initialize DB connection
      for value in input do begin
        //store value into database
      end;
      //close DB connection
    end;
    
    procedure TfrmPipelineDemo.btnStartClick(Sender: TObject);
    begin
      FLogger := Parallel.BackgroundWorker.NumTasks(1).Execute(Asy_LogMessage);
    
      FPipeline := Parallel.Pipeline
        .Stage(Asy_Monitor)
        .Stage(Asy_Parser)
        .Stage(Asy_SQL)
        .Run;
    end;
    
    procedure TfrmPipelineDemo.btnStopClick(Sender: TObject);
    begin
      FPipeline.Input.CompleteAdding;
      FPipeline := nil;
      FLogger.Terminate(INFINITE);
      FLogger := nil;
    end;
    
    end.