Search code examples
javaazureamqpazure-eventhubqpid

Qpid receiver on Azure EventHub


I have already working application based on Azure EventHub. Now I need write java receiver that connects to the existing infrastructure. Existing configuration:

Event Hub > SomeName > Consumer Group > SomeGroupName

In the administrative console I cannot see any QUEUE or TOPIC definitions. Analyzing working c# code I can see that hub-name + group-name is enough to connect.

I have reconstructed url that allows me to connect over java (and connection works so far).

amqps://SomeName.servicebus.windows.net

So my questions:

1) When instead of queue /topic I specify group-name then I get exception The messaging entity 'sb://SomeName.servicebus.windows.net/SomeGroupName' could not be found. What is the model used there instead of queue/topic?

2) How to work with such infrastructure from Apache-qpid?


Solution

  • The greatest hint for resolve the question gave me following link: http://theitjourney.blogspot.com/2015/12/sendreceive-messages-using-amqp-in-java.html

    So No queue neither topic in this model. You need to connect to specific provider and specify correct EventHub as following:

    application.properties:

    connectionfactory.SBCF=amqps://<PolicyName>:<PolicyKey>@<DomainName>.servicebus.windows.net
    queue.EventHub=<EventHubName>/ConsumerGroups/$Default/Partitions/0
    

    Where: enter image description here

    After that following code allowed me to create MessageConsumer:

    Hashtable<String, String> env = new Hashtable<>();
    env.put(Context.INITIAL_CONTEXT_FACTORY,
                   "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
    env.put(Context.PROVIDER_URL, 
        getClass().getResource("/application.properties").toString());
    Context context = null;
    
    context = new InitialContext(env);
    // Look up ConnectionFactory 
    ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
    Destination queue = (Destination) context.lookup("EventHub");
    
    // Create Connection
    Connection connection = cf.createConnection();
    
    // Create receiver-side Session, MessageConsumer
    Session receiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MessageConsumer receiver = receiveSession.createConsumer(queue);