Search code examples
javamultithreadingsynchronizationwaitnotify

Wait() and NotifyAll() does not work in programm


I am writing a message oriented middleware, in which subscribers and publishers communicate via a queue in the middle class MessageBroker. publishers should drop messages into a queue of a topic if it is not full and subscribers should get the message of a subscribed topic if the queue is not empty. The problem now arises when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.

Publisher method:

public synchronized void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            try {
                // Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
                while (topicQueue.remainingCapacity() == 0) {
                    wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Message m = topic.generateMessage();
            mb.publish(m);
            if (!gotActive) {
                mb.increasePublisherCounter(1);
                gotActive = true;
            }
            System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
            notifyAll(); // Alle Threads benachrichtigen
        }
    }

Subscriber method:

public synchronized void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            synchronized (queue) {
                try {
                    // Warten, bis die Warteschlange nicht mehr leer ist
                    while (queue.isEmpty()) {
                        queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
                    }

                    // Nachricht aus der Warteschlange holen
                    Message message = queue.peek();
                    if (message != null) {
                        System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
                        incrementProcessedCounter(topic);
                        if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
                            queue.remove();
                            resetProcessedCounter(topic);
                            queue.notifyAll(); // Benachrichtige andere wartende Threads
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

what do I have to change in these methods that the methods will work? Thanks for your help. I can also add more code if you need :)


Solution

  • when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.

    I have a number of questions about your code but the first comment is that you should not have to do any wait/notify yourself. One of the benefits of the BlockingQueue is that they are fully synchronized. Calling queue.take() will block until there is a message to be consumed and queue.put() will wait until there is space in the queue to put it.

    public void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            Message m = topic.generateMessage();
            topicQueue.put(m);
            // ... counters and the like?
        }
    }
    

    and

    public void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            Message message = queue.take();
            // ... work with the message
         }
    }
    

    As to why your code isn't working it's very hard to tell. Here are some comments:

    • I don't see you put-ing the message in the topicQueue in your sendMessage().
    • Be very careful about what you are synchronizing on. Both the publisher and the subscriber need to be synchronizing on the same object instance if one is waiting for the other to notify.
    • Be careful about the counter and active boolean that they are properly synchronized if they are being written/read by different threads.
    • A peek() and remove() can be accomplished with a poll() which does both of those things, returning null if no messages in the queue. Again take() waits.
    • Use good patterns with InterruptedException. You should at least do a Thread.currentThread().interrupt(); in the catch block but you should also return or break out of the loop or something.

    Hope something here helps.