I have a listener, which receives work in the form of IPayload. The listener should push this work to observers who actually do the work. This is my first crude attempt to achieve this:
public interface IObserver
{
void DoWork(IPayload payload);
}
public interface IObservable
{
void RegisterObserver(IObserver observer);
void RemoveObserver(IObserver observer);
void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
public void DoWork(IPayload payload)
{
// do some heavy lifting
}
}
public class Listener : IObservable
{
private readonly List<IObserver> _observers = new List<IObserver>();
public void PushIncomingPayLoad(IPayload payload)
{
NotifyObservers(payload);
}
public void RegisterObserver(IObserver observer)
{
_observers.Add(observer);
}
public void RemoveObserver(IObserver observer)
{
_observers.Remove(observer);
}
public void NotifyObservers(IPayload payload)
{
Parallel.ForEach(_observers, observer =>
{
observer.DoWork(payload);
});
}
}
Is this a valid approach that follows the observer/observable pattern (i.e. pub sub?)? My understanding is that the NotifyObservers also spins up a threat for each payload. Is this correct? Any improvement suggestions very much welcome.
Please note that all observers have to finish their work before new work in the form of a payload is passed on to them - the order of 'observation' does not matter. Basically, the listener has to act like a master whilst exploiting the cores of the host as much as possibly using the TPL. IMHO this requires the explicit registration of observers with the listener/Observable.
PS:
I think Parallel.ForEach does not create a thread for each observer: Why isn't Parallel.ForEach running multiple threads? If this is true how can I ensure to create a thread for each observer?
An alternative I have in mind is this:
public async void NotifyObservers(IPayload payload)
{
foreach (var observer in _observers)
{
var observer1 = observer;
await Task.Run(() => observer1.DoWork(payload));
}
await Task.WhenAll();
}
Of course you can do it this way, but in .net that is not needed if you dont want to reinvent the wheel :-) In c# there this could be done using events.
A short example :
//Your Listener who has a public eventhandler where others can add them as listeners
public class Listener{
//your eventhandler where others "add" them as listeners
public event EventHandler<PayLoadEventsArgs> IncomingPayload;
//a method where you process new data and want to notify the others
public void PushIncomingPayLoad(IPayload payload)
{
//check if there are any listeners
if(IncomingPayload != null)
//if so, then notify them with the data in the PayloadEventArgs
IncomingPayload(this, new PayloadEventArgs(payload));
}
}
//Your EventArgs class to hold the data
public class PayloadEventArgs : EventArgs{
Payload payload { get; private set; }
public PayloadEventArgs(Payload payload){
this.payload = payload;
}
}
public class Worker{
//add this instance as a observer
YourListenerInstance.IncomingPayload += DoWork;
//remove this instance
YourListenerInstance.IncomingPayload -= DoWork;
//This method gets called when the Listener notifies the IncomingPayload listeners
void DoWork(Object sender, PayloadEventArgs e){
Console.WriteLine(e.payload);
}
}
EDIT: As the question asks for parallel execution, how about doing the new thread at the subscriber side? I think this is the easiest approach to achieve this.
//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () =>
{
//Do your work here
});
//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () =>
{
//Do your work here
}, TaskCreationOptions.LongRunning);
Hopefully this answers your question? If not, please leave a comment.