Search code examples
javaactivemq-classicbindamqpqpid

Java Qpid Proton - ActiveMQ broker Cannot assign requested address: bind


I really need an answer to this question that's why I'm editing it.

I have an Apache ActiveMQ broker built inside my connection using this code

Broker.java

public class Broker {

private BrokerService broker;

public Broker(String connector) {
    this.broker = new BrokerService();
    this.broker.setUseJmx(true);
    try {
        this.broker.addConnector(connector);

    } catch (URISyntaxException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public void addConnector(String connector){
    try {
        this.broker.addConnector(connector);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public void start() {
    try {
        this.broker.start();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public BrokerService getBroker() {
    return broker;
}

public void setBroker(BrokerService broker) {
    this.broker = broker;
}

}

Here is my problem

I use the Qpid Proton library (available here : Qpid Proton). I have one class to read data which is nearly the example they give you on qpid webiste

package messaging;

import java.io.IOException;

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;

public class AMQPSubscriber extends BaseHandler {

    private String broker;
    private String topic;
    private String port;

    public AMQPSubscriber(String broker, String port, String topic) {
        this.broker = broker;
        this.port = port;
        this.topic = topic;
        this.add(new Handshaker());
        this.add(new FlowController());

    }

    @Override
    public void onReactorInit(Event event) {
        try {
            event.getReactor().acceptor(broker, Integer.parseInt(port), new AMQPSubscriber(broker, port, topic));
        } catch (NumberFormatException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onDelivery(Event event) {
        System.out.println("---------Message Received--------");
        Receiver recv = (Receiver) event.getLink();
        Delivery delivery = recv.current();
        if (delivery.isReadable() && !delivery.isPartial()) {
            int size = delivery.pending();
            byte[] buffer = new byte[size];
            int read = recv.recv(buffer, 0, buffer.length);
            recv.advance();

            Message msg = Proton.message();
            msg.decode(buffer, 0, read);
            System.out.println("Subject : " + msg.getProperties().getSubject());
            System.out.println("Text : " + ((AmqpValue) msg.getBody()).getValue());
        }
    }

}

This class is called in the main :

public static void main (String[]args) throws IOException, TimeoutException, InterruptedException{

    Broker broker = new Broker("amqp://" + host + ":" + AMQPport);
    broker.start();

 AMQPSubscriber receiv = new AMQPSubscriber(host, "5672", topic);
        Reactor r;
        try {
            r = Proton.reactor(receiv);
            r.run();
        } catch (IOException e) {
            e.printStackTrace();
        }


}

But when i execute this code, i get a

 INFO | Loaded the Bouncy Castle security provider.
 INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB]
 INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
 INFO | KahaDB is version 6
 INFO | Recovering from the journal @1:61115
 INFO | Recovery replayed 11 operations from the journal in 0.014 seconds.
 INFO | PListStore:[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\tmp_storage] started
 INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) is starting
 INFO | Listening for connections at: amqp://127.0.0.1:5672
 INFO | Connector amqp://127.0.0.1:5672 started
 INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) started
 INFO | For help or more information please see: http://activemq.apache.org
 WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
 WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
java.net.BindException: Address already in use: bind
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.Net.bind(Unknown Source)
    at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
    at java.nio.channels.ServerSocketChannel.bind(Unknown Source)
    at org.apache.qpid.proton.reactor.impl.AcceptorImpl.<init>(AcceptorImpl.java:102)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.acceptor(ReactorImpl.java:477)
    at messaging.AMQPSubscriber.onReactorInit(AMQPSubscriber.java:33)
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:209)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
    at org.apache.qpid.proton.engine.impl.EventImpl.delegate(EventImpl.java:129)
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:114)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:275)
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:343)
    at messaging.Main.main(Main.java:98)

This broker works fine when I use MQTT and Paho, I would like it also works with AMQP. I know bind means the port is already used, but I can't figure out how I could listen on a port where no data is sent.

Thanks for helping me.

Alexi


Solution

  • I have found a solution.

    When i add a connector to the activeMQ embedded broker, it adds it as a TCP which only allows one connection at a time.

    So I create a connector as UDP like this : broker.addConnector("udp://"+host+":"+AMQPport);

    This solution is working for me I hope it can help other developers in the future.

    Cheers, Alexi