Search code examples
postgresqldelphidevartomnithreadlibrary

Handling NOTICE events from PostgreSQL in a separate thread in Delphi with Devart's PgDAC components


I want to execute long queries in a separate thread in order to be able to abort them and also to give feedback to the users. All of this is working but I sometimes get Access Violations because, I think, the processing of the OnNotice events is not done the right way and I would like to know the proper way of doing this.

I am using Devart's PgDAC and OmniThreadLibrary on Delphi 2010.

The PostgreSQL code that I execute is a stored procedure that contains things like :

RAISE NOTICE 'ad: %',myad.name;

Here are the interesting parts of my code :

procedure TFDecomptes.FormCreate(Sender: TObject);
begin
  ThreadConnection := TPgConnection.Create(Self);
  ThreadConnection.Assign(DM.PgConnection1);
end;

This ThreadConnection is the TPgConnection that will be used to execute the query (within a separate thread).

procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
  ThreadConnection.BreakExec;
end;

This is what the "Interrupt query" button does. I'm not sure this is very "thread safe" since it is used in the main thread but does something on the TPgConnection dedicated to the query-execution thread.

procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
  case msg.MsgID of
    1: begin
         CalculationError:=msg.MsgData.AsString;
       end;
  end;
end;

This is where I show errors happening during the thread execution (like SQL errors or query cancellation).

procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
var s:String;
begin
  s:=Errors[Errors.Count-1].ToString;
  if copy(s,1,4)='ad: ' then begin
    delete(s,1,4);
    LAD.Caption:=s;
  end;
end;

This is the OnNotice event processing. All it is doing is modify a Label's caption.

procedure InternalExecQuery(const task: IOmniTask);
Var q:TPgSQL;
begin
  q:=Task.Param['pgsql'];
  Try
    q.Execute;
  Except
    On E:Exception do task.Comm.Send(1,e.Message);
  End;
end;

procedure TFDecomptes.StartClick(Sender: TObject);
begin
  ThreadConnection.OnNotice:=PgConnectionNotice;
  Timer1.Enabled:=True;
  CalculationTask := CreateTask(InternalExecQuery, 'CalculeDecomptes')
    .MonitorWith(OmniEventMonitor1)
    .SetParameter('pgsql', PgSQL)
    .Run;
end;

And this is how the query is run.

So the PgConnectionNotice event (running in the main thread) is attached to the ThreadConnection (used in the query-execution thread) and this is what I suspect to be generating these random access violations.

I don't know how to handle this. Should I use some kind of lock when inside PgConnectionNotice (Synchronize ?).

This is what I tried :

procedure TFDecomptes.OmniEventMonitor1TaskMessage(const task: IOmniTaskControl; const msg: TOmniMessage);
begin
  case msg.MsgID of
    1: begin
         CalculationError:=msg.MsgData.AsString;
       end;

    2: begin
         lad.caption:='Here';
       end;
  end;
end;

procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
begin
  // I am not using the passed string in this test
  CalculationTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
end;

The message sent in PgConnectionNotice (with MsgId=2) is never received by OmniEventMonitor1TaskMessage.

I have tried using CalculationTask.Invoke but didn't understand how to call it in order to pass a string parameter (I don't think Delphi 2010 allows for anonymous functions).

When I tried the simpler action of cancelling the query like this, it stopped cancelling the query :

procedure TFDecomptes.DoTheInterrupt;
begin
  ThreadConnection.BreakExec;
end;

procedure TFDecomptes.BInterruptClick(Sender: TObject);
begin
  CalculationTask.Invoke(DoTheInterrupt);
end;

So I guess I shouldn't do the calls via CalculationTask. Should I store the task created in InternalExecQuery in a global variable and use that ?


Solution

  • The main problem is that I was confusing IOmniTask and IOmniTaskControl. IOmniTask is the background's interface that should be used to send messages to the main thread while IOmniTaskControl is the main thread's interface, used to talk to the background tasks.

    So using CalculationTask (which is a IOmniTaskControl) inside PgConnectionNotice is a double mistake : since PgConnectionNotice is fired from within the background thread, I was sending messages to the background thread, from the background thread, using a main thread's variable.

    So I added a global variable named RunningTask :

    Var RunningTask : IOmniTask;
    

    Set it to nil in the form's OnCreate and modify the task's code like this :

    procedure InternalExecQuery(const task: IOmniTask);
    Var q:TPgSQL;
    begin
      RunningTask := task;
      try
        q:=Task.Param['pgsql'];
        Try
          q.Execute;
        Except
          On E:Exception do task.Comm.Send(1,e.Message);
        End;
      finally
        RunningTask := Nil;
      end;
    end;
    

    And the OnNotice event now looks like :

    procedure TFDecomptes.PgConnectionNotice(Sender: TObject; Errors: TPgErrors);
    begin
      if RunningTask=Nil then
        // do nothing, old, pending notices
      else
        RunningTask.Comm.Send(2,Errors[Errors.Count-1].ToString);
    end;
    

    I know it is not clean to define a global variable although I know there will be at most one background task. I probably should have stored the IOmniTask reference within ThreadConnection because this is what Sender is in PgConnectionNotice.