Search code examples
multithreadingobserver-pattern

Should Observers be notified in separate threads each one?


I know it sounds heavy weight, but I'm trying to solve an hypothetical situation. Imagine you have N observers of some object. Each one interested in the object state. When applying the Observer Pattern the observable object tends to iterate through its observer list invoking the observer notify()|update() method.

Now imagine that a specific observer has a lot of work to do with the state of the observable object. That will slow down the last notification, for example.

So, in order to avoid slowing down notifications to all observers, one thing we can do is to notify the observer in a separate thread. In order for that to work, I suppose that a thread for each observer is needed. That is a painful overhead we are having in order to avoid the notification slow down caused by heavy work. Worst than slowing down if thread approach is used, is dead threads caused by infinite loops. It would be great reading experienced programmers for this one.

  • What people with years on design issues think?
  • Is this a problem without a substancial solution?
  • Is it a really bad idea? why?

Example

This is a vague example in order to demonstrate and, hopefully, clarify the basic idea that I don't even tested:

class Observable(object):
    def __init__(self):
        self.queues = {}

    def addObserver(self, observer):
        if not observer in self.queues:
            self.queues[observer] = Queue()
            ot = ObserverThread(observer, self.queues[observer])
            ot.start()

    def removeObserver(self, observer):
        if observer in self.queues:
            self.queues[observer].put('die')
            del self.queues[observer]

    def notifyObservers(self, state):
        for queue in self.queues.values():
            queue.put(state)

class ObserverThread(Thread):
    def __init__(self, observer, queue):
        self.observer = observer
        self.queue = queue

    def run(self):
        running = True
        while running:
            state = self.queue.get()
            if state == 'die':
                running = False
            else:
                self.observer.stateChanged(state)

Solution

  • You're on the right track.

    It is common for each observer to own its own input-queue and its own message handling thread (or better: the queue would own the thread, and the observer would own the queue). See Active object pattern.

    There are some pitfalls however:

    • If you have 100's or 1000's of observers you may need to use a thread pool pattern
    • Note the you'll lose control over the order in which events are going to be processed (which observer handles the event first). This may be a non-issue, or may open a Pandora box of very-hard-to-detect bugs. It depends on your specific application.
    • You may have to deal with situations where observers are deleted before notifiers. This can be somewhat tricky to handle correctly.
    • You'll need to implement messages instead of calling functions. Message generation may require more resources, as you may need to allocate memory, copy objects, etc. You may even want to optimize by implementing a message pool for common message types (you may as well choose to implement a message factory that wrap such pools).
    • To further optimize, you'll probably like to generate one message and send it to all to observers (instead of generating many copies of the same message). You may need to use some reference counting mechanism for your messages.