I have already working application based on Azure EventHub. Now I need write java receiver that connects to the existing infrastructure. Existing configuration:
Event Hub > SomeName > Consumer Group > SomeGroupName
In the administrative console I cannot see any QUEUE or TOPIC definitions. Analyzing working c# code I can see that hub-name + group-name is enough to connect.
I have reconstructed url that allows me to connect over java (and connection works so far).
amqps://SomeName.servicebus.windows.net
So my questions:
1) When instead of queue /topic I specify group-name then I get exception The messaging entity 'sb://SomeName.servicebus.windows.net/SomeGroupName' could not be found.
What is the model used there instead of queue/topic?
2) How to work with such infrastructure from Apache-qpid?
The greatest hint for resolve the question gave me following link: http://theitjourney.blogspot.com/2015/12/sendreceive-messages-using-amqp-in-java.html
So No queue neither topic in this model. You need to connect to specific provider and specify correct EventHub as following:
application.properties
:
connectionfactory.SBCF=amqps://<PolicyName>:<PolicyKey>@<DomainName>.servicebus.windows.net
queue.EventHub=<EventHubName>/ConsumerGroups/$Default/Partitions/0
After that following code allowed me to create MessageConsumer:
Hashtable<String, String> env = new Hashtable<>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory");
env.put(Context.PROVIDER_URL,
getClass().getResource("/application.properties").toString());
Context context = null;
context = new InitialContext(env);
// Look up ConnectionFactory
ConnectionFactory cf = (ConnectionFactory) context.lookup("SBCF");
Destination queue = (Destination) context.lookup("EventHub");
// Create Connection
Connection connection = cf.createConnection();
// Create receiver-side Session, MessageConsumer
Session receiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer receiver = receiveSession.createConsumer(queue);