Search code examples
multithreadingdelphiomnithreadlibrary

How to know the state of Pipeline stages in OmniThreadLibrary?


gabr's answer to another question shows an example of using Parallel.Pipeline for data processing.
At the moment I need to know when the Pipeline was started and when all its stages are completed. I read the other gabr's answer for this problem How to monitor Pipeline stages in OmniThreadLibrary?. I tried to do it like this (modified according to the answer):

unit Unit1;

interface

uses
  Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
  Dialogs, StdCtrls, superobject,
  OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;

const
  WM_STARTED = WM_USER;
  WM_ENDED = WM_USER + 1;

type
  TForm1 = class(TForm)
    btnStart: TButton;
    btnStop: TButton;
    lbLog: TListBox;
    procedure btnStartClick(Sender: TObject);
    procedure btnStopClick(Sender: TObject);
  private
    FCounterTotal: IOmniCounter;
    FCounterProcessed: IOmniCounter;
    FIsBusy: boolean;
    FPipeline: IOmniPipeline;
    procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
    procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
  strict protected
    procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
    procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

var
  Form1: TForm1;

  procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';

implementation

uses IOUtils;

{$R *.dfm}

procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  i, cnt: integer;
  f: string;
begin
  while not input.IsCompleted do begin

    task.Comm.Send(WM_STARTED); // message is sent once every 1 min
    cnt := 0;

    for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
    begin
      output.TryAdd(f);
      Inc(cnt);
      Sleep(1000); // simulate a work
    end;
    FCounterTotal.Value := cnt;

    // I need to continously check a specified folder for new files, with
    // a period of 1 minute (60 sec) for an unlimited period of time.
    i := 60;
    repeat
      Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
      if input.IsCompleted then Break;
      dec(i);
    until i < 0;
  end;
end;

procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
var
  sl: TStringList;
  ws: WideString;
begin
  sl := TStringList.Create;
  try
    sl.LoadFromFile(input.AsString);
    GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
    output := SO(ws);
//     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
  finally
    sl.Free;
  end;
end;

procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  value: TOmniValue;
  JSON: ISuperObject;
  cnt: integer;
begin
  for value in input do begin
    JSON := value.AsInterface as ISuperObject;
    // do something with JSON

    cnt := FCounterProcessed.Increment;
    if FCounterTotal.Value = cnt then
      task.Comm.Send(WM_ENDED); // !!! message is not sent
  end;
end;

//
procedure TForm1.btnStartClick(Sender: TObject);
begin
  btnStart.Enabled := False;

  FCounterTotal := CreateCounter(-1);
  FCounterProcessed := CreateCounter(0);

  FPipeline := Parallel.Pipeline
    .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
    .Stage(Async_Parse)
    .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
    .Run;
end;

procedure TForm1.btnStopClick(Sender: TObject);
begin
  if Assigned(FPipeline) then begin
    FPipeline.Input.CompleteAdding;
    FPipeline := nil;
  end;

  btnStart.Enabled := True;
end;

//
procedure TForm1.WMEnded(var msg: TOmniMessage);
begin
  FIsBusy := False;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
end;

procedure TForm1.WMStarted(var msg: TOmniMessage);
begin
  FIsBusy := True;
  lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
end;

end.

With task.Comm.Send(WM_STARTED) all is OK, but the line task.Comm.Send(WM_ENDED) is never executed. How do I know when the last stage has been completed? What is the correct way?


Solution

  • I give thanks to gabr whose advice use a special sentinel value helped me find a solution for my problem. This code works as expected:

    unit Unit1;
    
    interface
    
    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, superobject,
      OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
    
    const
      WM_STARTED = WM_USER;
      WM_ENDED = WM_USER + 1;
    
    type
      TForm1 = class(TForm)
        btnStart: TButton;
        btnStop: TButton;
        lbLog: TListBox;
        procedure btnStartClick(Sender: TObject);
        procedure btnStopClick(Sender: TObject);
      private
        FIsBusy: boolean;
        FPipeline: IOmniPipeline;
        procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
        procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
      strict protected
        procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
        procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
        procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
      end;
    
    var
      Form1: TForm1;
    
      procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
    
    implementation
    
    uses IOUtils;
    
    {$R *.dfm}
    
    procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    var
      i: integer;
      f: string;
    begin
      while not input.IsCompleted do begin
    
        task.Comm.Send(WM_STARTED); // message is sent once every 1 min
    
        for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
        begin
          output.TryAdd(f);
          Sleep(1000); // simulate a work
        end;
        output.TryAdd(0); // to send a special 'sentinel' value
    
        // I need to continously check a specified folder for new files, with
        // a period of 1 minute (60 sec) for an unlimited period of time.
        i := 60;
        repeat
          Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
          if input.IsCompleted then Break;
          dec(i);
        until i < 0;
      end;
    end;
    
    procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
    var
      sl: TStringList;
      ws: WideString;
    begin
      if input.IsInteger and (input.AsInteger = 0) then begin
        output := 0; // if we got 'sentinel' value send it to the next stage
        Exit;
      end;
    
      sl := TStringList.Create;
      try
        sl.LoadFromFile(input.AsString);
        GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
        output := SO(ws);
    //     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
      finally
        sl.Free;
      end;
    end;
    
    procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    var
      value: TOmniValue;
      JSON: ISuperObject;
    begin
      for value in input do begin
    
        if value.IsInteger and (value.AsInteger = 0) then begin
          task.Comm.Send(WM_ENDED); // if we got 'sentinel' value
          Continue;
        end;
    
        JSON := value.AsInterface as ISuperObject;
        // do something with JSON
      end;
    end;
    
    //
    procedure TForm1.btnStartClick(Sender: TObject);
    begin
      btnStart.Enabled := False;
    
      FPipeline := Parallel.Pipeline
        .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
        .Stage(Async_Parse)
        .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
        .Run;
    end;
    
    procedure TForm1.btnStopClick(Sender: TObject);
    begin
      if Assigned(FPipeline) then begin
        FPipeline.Input.CompleteAdding;
        FPipeline := nil;
      end;
    
      btnStart.Enabled := True;
    end;
    
    //
    procedure TForm1.WMEnded(var msg: TOmniMessage);
    begin
      FIsBusy := False;
      lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
    end;
    
    procedure TForm1.WMStarted(var msg: TOmniMessage);
    begin
      FIsBusy := True;
      lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
    end;
    
    end.
    

    An alternative with using Exception as a sentinel (not worked yet, but I'm probably doing something wrong):

    unit Unit1;
    
    interface
    
    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, superobject,
      OtlCommon, OtlCollections, OtlParallel, OtlComm, OtlTask, ExtCtrls;
    
    const
      WM_STARTED = WM_USER;
      WM_ENDED = WM_USER + 1;
    
    type
      ESentinelException = class(Exception);
    
      TForm1 = class(TForm)
        btnStart: TButton;
        btnStop: TButton;
        lbLog: TListBox;
        procedure btnStartClick(Sender: TObject);
        procedure btnStopClick(Sender: TObject);
      private
        FIsBusy: boolean;
        FPipeline: IOmniPipeline;
        procedure WMStarted(var msg: TOmniMessage); message WM_STARTED;
        procedure WMEnded(var msg: TOmniMessage); message WM_ENDED;
      strict protected
        procedure Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
        procedure Async_Parse(const input: TOmniValue; var output: TOmniValue);
        procedure Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
      end;
    
    var
      Form1: TForm1;
    
      procedure GetJSON_(const AData: PChar; var Output: WideString); stdcall; external 'my.dll';
    
    implementation
    
    uses IOUtils;
    
    {$R *.dfm}
    
    procedure TForm1.Async_Files(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    var
      i: integer;
      f: string;
    begin
      while not input.IsCompleted do begin
    
        task.Comm.Send(WM_STARTED); // message is sent once every 1 min
    
        for f in TDirectory.GetFiles(ExtractFilePath(Application.ExeName), '*.txt') do
        begin
          output.TryAdd(f);
          Sleep(1000); // simulate a work
        end;
    
        raise ESentinelException.Create('sentinel');
    
        // I need to continously check a specified folder for new files, with
        // a period of 1 minute (60 sec) for an unlimited period of time.
        i := 60;
        repeat
          Sleep(1000); // Check if we should stop every second (if Stop button is pushed)
          if input.IsCompleted then Break;
          dec(i);
        until i < 0;
      end;
    end;
    
    procedure TForm1.Async_Parse(const input: TOmniValue; var output: TOmniValue);
    var
      sl: TStringList;
      ws: WideString;
    begin
      sl := TStringList.Create;
      try
        sl.LoadFromFile(input.AsString);
        GetJSON_(PChar(sl.Text), ws); // output as ISuperObject ---  DLL procedure
        output := SO(ws);
    //     TFile.Delete(input.AsString); // For testing purposes only - Continue without Deleting Processed File
      finally
        sl.Free;
      end;
    end;
    
    procedure TForm1.Async_JSON(const input, output: IOmniBlockingCollection; const task: IOmniTask);
    var
      value: TOmniValue;
      JSON: ISuperObject;
    begin
      for value in input do begin
    
        if value.IsException and (value.AsException is ESentinelException) then begin
          task.Comm.Send(WM_ENDED); // if we got 'sentinel' Exception
          value.AsException.Free;
        end
        else begin
          JSON := value.AsInterface as ISuperObject;
          // do something with JSON
        end;
      end;
    end;
    
    //
    procedure TForm1.btnStartClick(Sender: TObject);
    begin
      btnStart.Enabled := False;
    
      FPipeline := Parallel.Pipeline
        .Stage(Async_Files, Parallel.TaskConfig.OnMessage(Self))
        .Stage(Async_Parse)
        .Stage(Async_JSON, Parallel.TaskConfig.OnMessage(Self))
        .HandleExceptions
        .Run;
    end;
    
    procedure TForm1.btnStopClick(Sender: TObject);
    begin
      if Assigned(FPipeline) then begin
        FPipeline.Input.CompleteAdding;
        FPipeline := nil;
      end;
    
      btnStart.Enabled := True;
    end;
    
    //
    procedure TForm1.WMEnded(var msg: TOmniMessage);
    begin
      FIsBusy := False;
      lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 3 ended', [DateTimeToStr(Now)]));
    end;
    
    procedure TForm1.WMStarted(var msg: TOmniMessage);
    begin
      FIsBusy := True;
      lbLog.ItemIndex := lbLog.Items.Add(Format('%s - Pipeline stage 1 starting', [DateTimeToStr(Now)]));
    end;
    
    end.