Search code examples
c#design-patternstask-parallel-libraryreactivex

push work to observers


I have a listener, which receives work in the form of IPayload. The listener should push this work to observers who actually do the work. This is my first crude attempt to achieve this:

public interface IObserver
{
    void DoWork(IPayload payload);
}

public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}

public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}

public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();

    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }

    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }

    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }

    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

Is this a valid approach that follows the observer/observable pattern (i.e. pub sub?)? My understanding is that the NotifyObservers also spins up a threat for each payload. Is this correct? Any improvement suggestions very much welcome.

Please note that all observers have to finish their work before new work in the form of a payload is passed on to them - the order of 'observation' does not matter. Basically, the listener has to act like a master whilst exploiting the cores of the host as much as possibly using the TPL. IMHO this requires the explicit registration of observers with the listener/Observable.

PS:

I think Parallel.ForEach does not create a thread for each observer: Why isn't Parallel.ForEach running multiple threads? If this is true how can I ensure to create a thread for each observer?

An alternative I have in mind is this:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}

Solution

  • Of course you can do it this way, but in .net that is not needed if you dont want to reinvent the wheel :-) In c# there this could be done using events.

    A short example :

      //Your Listener who has a public eventhandler where others can add them as listeners
      public class Listener{
          //your eventhandler where others "add" them as listeners
          public event EventHandler<PayLoadEventsArgs> IncomingPayload;
    
          //a method where you process new data and want to notify the others
          public void PushIncomingPayLoad(IPayload payload)
          {
              //check if there are any listeners
              if(IncomingPayload != null)
                  //if so, then notify them with the data in the PayloadEventArgs
                  IncomingPayload(this, new PayloadEventArgs(payload));
          }
      }  
    
      //Your EventArgs class to hold the data
      public class PayloadEventArgs : EventArgs{
    
          Payload payload { get; private set; }  
    
          public PayloadEventArgs(Payload payload){
              this.payload = payload;
          }
      }
    
      public class Worker{
          //add this instance as a observer
          YourListenerInstance.IncomingPayload += DoWork;
    
          //remove this instance 
          YourListenerInstance.IncomingPayload -= DoWork;
    
          //This method gets called when the Listener notifies the  IncomingPayload listeners
          void DoWork(Object sender, PayloadEventArgs e){
              Console.WriteLine(e.payload);
          }
       }
    

    EDIT: As the question asks for parallel execution, how about doing the new thread at the subscriber side? I think this is the easiest approach to achieve this.

    //Inside the DoWork method of the subscriber start a new thread
    Task.Factory.StartNew( () => 
    {
          //Do your work here
    });
    
    //If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
    Task.Factory.StartNew( () => 
    {
          //Do your work here
    }, TaskCreationOptions.LongRunning);
    

    Hopefully this answers your question? If not, please leave a comment.