Search code examples
delphifiremonkey

How to process an asynchronous queue from within the main UI thread?


I am designing two components that asynchronously receive objects of a custom class (TMELogMessage) and store them in a thread-safe internal container. The first component is non visual (TMEFileLogger) and should write some info from these objects to a log file (non surprisingly). The second component (TMELogGrid) is a visual FMX.Grid descendant that should visualize some info from these objects in the UI. But what they do with these objects is, I think, irrelevant.

The problem I am facing is that these components do not actually know when these objects will be enqueued in their internal container, so they have to check the container themselves to see if there are any new objects that need processing, process them and remove them from the queue. Ideally I'd want this to be done when the application is not too busy, in a way similar to action updating, so as not to bog down the UI.

It is obviously wrong for a component to hook an event handler like Application.OnIdle... I could maybe subscribe to the TIdleMessage, but I'm not sure that is a good idea, as I've read that a some applications could never go idle. Using an internal timer seems a bit old-school. I could maybe use a low priority thread to poll the queue and then synchronize with the UI only when I find some object to process. I don't have other ideas though.

What is the proper way to do this in Delphi + multiplatform FireMonkey?


Solution

  • I don't like to answer my own questions, but I wanted this question to be answered, as it might be helpful to others. While Deltics' answer is useful, that is not the way I decided to go. I followed the advice in Remy's comment and encapsulated everything in a message handling class that components and forms can use. So both the TMEFileLogger and the TMELogGrid now use an instance of this new TMEMessageHandler class.

    Here is some interface code to explain what I did. Keep in mind that this is to be a substitute and enhancement of the rtl System.Messaging unit. The problem with the rtl messaging system is that it provides only for sending synchronous messages. I wanted a richer interface. This is what my message manager looks like:

      TMEMessageManager = Class
        ...
      Public
        ...
        Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
        Procedure PostDelayedMessage(Const Sender: TObject; AMessage: TMessage; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
        Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
        Procedure PostMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;
        Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
        Procedure SendMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;
    
        Function Subscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver): Integer; Overload;
        Function Subscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';
        Function Subscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';
    
        Procedure Unsubscribe(Const AMessageClass: TClass; ID: Integer; Const Immediate: Boolean = False); Overload;
        Procedure Unsubscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver; Const Immediate: Boolean = False); Overload;
        Procedure Unsubscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod; Const Immediate: Boolean = False); Overload; Deprecated;
        Procedure Unsubscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure; Const Immediate: Boolean = False); Overload; Deprecated;
        ...
      End;
    

    The TMEMessageEnvelope is a wrapper for messages, defined as:

    Type
      TMEMessageEnvelope = Class(TMEPersistent)
      Public
        ...
        Property Infos: TMEMessageInfos Read FInfos;
        Property Sender: TObject Read FSender;
        Property Msg: TMessage Read FMsg;
      End;
    

    Receivers that subscribe via an envelope receiver will receive both synchronous and asynchronous messages. This is the preferred subscription method. Receivers that subscribe via an object method or via a procedure will only receive synchronous messages. This is maintained for compatibility with the RTL messaging system, but is deprecated.

    The problem is that RTL messages cannot be posted, as they are. The subscribed consumers just provide a procedure or an object method to consume the message, right away. To post the message so as it can be consumed later, asynchronously, it needs to be wrapped and enqueued. This way the sender is isolated from the receivers. So actually... messages are posted (immediately or delayed) by first wrapping them in envelopes.

    Here are the base interfaces envolved in this messaging system:

    Type
    
      IMEClonableMessage = Interface(IInterface)
        ['{45B223E2-DCA8-4E42-9847-6A3FCC910891}']
        Function Clone: TMessage;
      End;
    
      IMEMessageSender = Interface(IInterface)
        ['{99AFDC4A-CE30-41A3-9AA5-D49F2F1106BD}']
        Procedure PostDelayedMessage(const M: TMessage; Const DelayMSec: Cardinal);
        Procedure PostMessage(Const M: TMessage);
        Procedure SendMessage(Const M: TMessage);
      End;
    
      IMEEnvelopeSender = Interface(IInterface)
        ['{C3AEC52C-A558-40AB-B07B-3000ECDB9C0C}']
        Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
        Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
        Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
      End;
    
      IMEEnvelopeReceiver = Interface(IInterface)
        ['{7D464713-2F25-4666-AAF8-757AF07688C3}']
        Procedure ClearEnvelopes;
        Procedure ProcessEnvelope;
        Procedure ProcessEnvelopes;
        Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
        Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
        Procedure Subscribe(Const AMessageClass: TClass);
        Procedure Unsubscribe(Const AMessageClass: TClass);
      End;
    

    The IMEClonableMessage interface is used to clone messages... asynchronous messages must be cloned... because if there are many subscribers to the same message, each will receive and consume the message in different times, so it is best that each has its own copy of the message.

    The other interfaces are, I think, self explanatory.

    And finally here is the TMEMessageHandler class:

      TMEMessageHandler = Class(TMEPersistent, IMEMessageSender, IMEEnvelopeSender, IMEEnvelopeReceiver)
        /// <summary>Basic thread-safe class that can send and receive synchronous and asynchronous messages and envelopes.</summary>
      Private
        FLock:                 TObject;
        FMessageManager:       TMEMessageManager;
        FSubscriptions:        TDictionary<TClass, Integer>;
        FEnvelopes:            TObjectList<TMEMessageEnvelope>;
        FOnReceiveEnvelope:    TReceiveEnvelopeEvent;
        FAutoProcessEnvelopes: Boolean;
        Procedure _Lock;
        Procedure _Unlock;
        Procedure ClearSubscriptions;
        Function GetMessageManager: TMEMessageManager;
        Procedure SetAutoProcessEnvelopes(Const Value: Boolean);
        Procedure SetMessageManager(Const Value: TMEMessageManager);
      Protected
        Function QueryInterface(Const IID: TGuid; Out Obj): HResult; Stdcall;
        Function _AddRef: Integer; Stdcall;
        Function _Release: Integer; Stdcall;
        Procedure DoReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
        Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
        Procedure PostDelayedMessage(Const M: TMessage; Const DelayMSec: Cardinal);
        Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
        Procedure PostMessage(Const M: TMessage);
        Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
        Procedure SendMessage(Const M: TMessage);
        Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
        Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
      Public
        Constructor Create; Override;
        Destructor Destroy; Override;
        Procedure ClearEnvelopes;
        Procedure ProcessEnvelope;
        Procedure ProcessEnvelopes;
        Procedure Subscribe(Const AMessageClass: TClass);
        Procedure Unsubscribe(Const AMessageClass: TClass);
        Property AutoProcessEnvelopes: Boolean Read FAutoProcessEnvelopes Write SetAutoProcessEnvelopes Default True;
        Property MessageManager: TMEMessageManager Read GetMessageManager Write SetMessageManager;
        Property OnReceiveEnvelope: TReceiveEnvelopeEvent Read FOnReceiveEnvelope Write FOnReceiveEnvelope;
      End;
    

    How all this works

    The TMEMessageHandler immediately delegates to the MessageManager any Subscribe and Unsubscribe calls. It will always subscribe providing itself as an IMEEnvelopeReceiver. It keeps track of subscriptions in its internal dictionary so as to be more efficient when it unsubscribes.

    It also immediately delegates any call to the Send, Post and PostDelayed methods. The TMEMessageManager:

    • Sends messages to subscribed procedures (as RTL)
    • Sends messages to subscribed object methods (as RTL)
    • Sends envelopes to subscribed receivers by calling their ReceiveEnvelope method
    • Posts envelopes (and envelope wrapped messages) to subscribed receivers by calling their QeueEnvelope method with a cloned copy of the envelope
    • Posts delayed envelopes (and envelope wrapped messages) to subscribed receivers by enqueing them first in an internal lightweight thread (TMEDelayedEnvelopeDeliverer) which has the message manager itself deliver them when the delay has passed

    As a receiver, the TMEMessageHandler implements the ReceiveEnvelope by simply delegating to the OnReceiveEnvelope event-handler.

    Posted envelopes are received by the QueueEnvelope method, which adds the envelope in its thread-safe queue and then, but only if AutoProcessEnvelopes is True, uses the main thread's Queue to call its own ProcessEnvelope method (as by Remy's suggestion):

    Function TMEMessageHandler.QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Begin
      _Lock;
      Try
        FEnvelopes.Add(Envelope);
        Result := FEnvelopes.Count;
      Finally
        _Unlock;
      End;
      If AutoProcessEnvelopes Then
        TThread.Queue(Nil,
          Procedure
          Begin
            ProcessEnvelope;
          End);
    End;
    

    The ProcessEnvelope method extracts the envelope from the thread-safe queue, calls the ReceiveEnvelope method (same as called by the message manager for synchronous messages) and then Frees the envelope (remember that this was a cloned copy just for this receiver):

    Procedure TMEMessageHandler.ProcessEnvelope;
    Var
      E: TMEMessageEnvelope;
    Begin
      If FEnvelopes.Count > 0 Then Begin
        _Lock;
        Try
          E := FEnvelopes.Extract(FEnvelopes[0]);
        Finally
          _Unlock;
        End;
        E.UpdateInfo(mieReceived);
        ReceiveEnvelope(E);
        E.Free;
      End;
    End;
    

    The ProcessEnvelopes method just calls the former as many times as necessary to empty the asynchronous message queue:

    Procedure TMEMessageHandler.ProcessEnvelopes;
    Begin
      While FEnvelopes.Count > 0 Do
        ProcessEnvelope;
    End;
    

    How is the TMEMessageHandler used

    Having defined TMELogMessage as an IMEClonableMessage to handle information to log, a minimal implementation for TMEFileLogger and other components looks like this:

    Type
      TMEFileLogger = Class(TMEComponent)
      Private
        ...
        FMessageHandler:    TMEMessagehandler;
      Protected
        ...
        Procedure ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
        Property MessageHandler: TMEMessagehandler Read FMessageHandler;
      Public
        Constructor Create(AOwner: TComponent); Override;
        Destructor Destroy; Override;
        ...
      End;
    
    Constructor TMEFileLogger.Create(AOwner: TComponent);
    Begin
      Inherited;
      ...
      FMessageHandler                  := TMEMessagehandler.Create;
      MessageHandler.OnReceiveEnvelope := ReceiveLogEnvelope;
      MessageHandler.Subscribe(TMELogMessage);
    End;
    
    Destructor TMEFileLogger.Destroy;
    Begin
      MessageHandler.Unsubscribe(TMELogMessage);
      MessageHandler.ProcessEnvelopes;
      FreeAndNil(FMessageHandler);
      ...
      Inherited;
    End;
    
    Procedure TMEFileLogger.ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
    Begin
      If Envelope.Msg Is TMELogMessage Then
        With Envelope.Msg As TMELogMessage Do
          ... something useful ...
    End;
    

    Sorry for the long post, but I hope this can be useful to someone else.