Search code examples
javaazureservicebus

Java on ServiceBus for Windows Server- Options?


What are the options available to develop Java applications using Service Bus for Windows?

  1. Java Message Broker API - This need ACS to work with, which SB for Win doesnt support.
  2. AMQP - This doesnt seem to work on SB for Windows, I keep getting error

    org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target:

While the same code works with Azure SB. So AMQP on SB for Windows seems to be not fully working?

Correct me if I have missed something?

Update To test AMQP on local machine, this is what I did

  1. Installed Service bus 1.1 on my local machine
  2. Took the sample mentioned here http://www.windowsazure.com/en-us/develop/java/how-to-guides/service-bus-amqp/
  3. Created a new namespace on my local machine
  4. Specified the following connection string in servicebus.properties (which is correctly referred in the code

    connectionfactory.SBCF = amqps://<username>:<password>@<MachineName>:5671/StringAnalyzerNS/

    queue.QUEUE = queue1

Code is updated with certificates.

At runtime I get this error

javax.jms.JMSException: Peer did not create remote endpoint for link, target: queue1
    at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:77)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:348)
    at org.apache.qpid.amqp_1_0.jms.impl.SessionImpl.createProducer(SessionImpl.java:63)
    at com.stringcompany.Analyzer.SimpleSenderReceiver.<init>(SimpleSenderReceiver.java:70)
    at com.stringcompany.Analyzer.SimpleSenderReceiver.main(SimpleSenderReceiver.java:95)
Caused by: org.apache.qpid.amqp_1_0.client.Sender$SenderCreationException: Peer did not create remote endpoint for link, target: queue1
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:171)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:104)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:97)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:83)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:69)
    at org.apache.qpid.amqp_1_0.client.Sender.<init>(Sender.java:63)
    at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:74)
    at org.apache.qpid.amqp_1_0.client.Session.createSender(Session.java:66)
    at org.apache.qpid.amqp_1_0.jms.impl.MessageProducerImpl.<init>(MessageProducerImpl.java:72)
    ... 4 more
javax.jms.JMSException: Session remotely closed

With the same code If I point to Azure service bus by setting the SB namespace and queue like below

connectionfactory.SBCF = amqps://<Policy name>:<Sec. Key>@<ns>.servicebus.windows.net queue.QUEUE = testq

This works, messages are exchanged.

Here is the code if someone wants to try it

package com.stringcompany.Analyzer;

//SimpleSenderReceiver.java

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Hashtable;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class SimpleSenderReceiver implements MessageListener {
    private static boolean runReceiver = true;
    private Connection connection;
    private Session sendSession;
    private Session receiveSession;
    private MessageProducer sender;
    private MessageConsumer receiver;
    private static Random randomGenerator = new Random();

public SimpleSenderReceiver() throws Exception {
    // Configure JNDI environment
    Hashtable<String, String> env = new Hashtable<String, String>();
    env.put(Context.INITIAL_CONTEXT_FACTORY,
            "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
    env.put(Context.PROVIDER_URL, "D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\servicebus.properties");
    Context context = new InitialContext(env);

    // Lookup ConnectionFactory and Queue
    ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");

    System.out.println("cf:"+cf);

    // Create Connection
    connection = cf.createConnection();
    System.out.println("connection :"+connection);

    connection.setExceptionListener(new ExceptionListener() {

        public void onException(JMSException arg0) {
            System.err.println(arg0);

        }
    });
    connection.start();



    // Create sender-side Session and MessageProducer
    sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    System.out.println("Session open");


    Destination queue = (Destination) context.lookup("QUEUE");
    System.out.println("queue:"+queue);

    sender = sendSession.createProducer(queue);
    Queue q=(Queue) queue;
    System.out.println(sender.getDestination());
    System.out.println("sender:"+sender);
    if (runReceiver) {
        System.out.println("Waitng for new message");
        // Create receiver-side Session, MessageConsumer,and MessageListener
        receiveSession = connection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        receiver = receiveSession.createConsumer(queue);
        receiver.setMessageListener(this);
        connection.start();
    }
}

public static void main(String[] args) {
    try {

        if ((args.length > 0) && args[0].equalsIgnoreCase("sendonly")) {
            runReceiver = false;
        }

        //System.setProperty("javax.net.debug","ssl");
        System.setProperty("javax.net.ssl.trustStore","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\SBKeystore.keystore");
        System.setProperty("log4j.configuration","D:\\Java\\Azure\\workspace\\Analyzer\\src\\main\\resources\\log4j.properties");
        SimpleSenderReceiver simpleSenderReceiver = new SimpleSenderReceiver();
        System.out
                .println("Press [enter] to send a message. Type 'exit' + [enter] to quit.");
        BufferedReader commandLine = new java.io.BufferedReader(
                new InputStreamReader(System.in));

        while (true) {
            String s = "Message";//commandLine.readLine();
            if (s.equalsIgnoreCase("exit")) {
                simpleSenderReceiver.close();
                System.exit(0);
            } else {
                simpleSenderReceiver.sendMessage();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private void sendMessage() throws JMSException {
    TextMessage message = sendSession.createTextMessage();
    message.setText("Test AMQP message from JMS");
    long randomMessageID = randomGenerator.nextLong() >>> 1;
    message.setJMSMessageID("ID:" + randomMessageID);
    sender.send(message);
    System.out.println("Sent message with JMSMessageID = "
            + message.getJMSMessageID());
}

public void close() throws JMSException {
    connection.close();
}

public void onMessage(Message message) {
    try {
        System.out.println("Received message with JMSMessageID = "
                + message.getJMSMessageID());
        message.acknowledge();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

  }

enter image description here


Solution

  • Hi we had the same problems and thankfully MS updated their documentation to show how to do this correctly. : http://msdn.microsoft.com/en-us/library/dn574799.aspx