In my application I have 3 classes:
- Company, which hires Workers for any of 3 jobs
- Workers, each can do 2 jobs
- Administrator, which receives copies of all messages in the program and can send messages to all companies, all workers or just everyone
I'm using work.companies.companyName
for companies keys and work.workers.workerName
for workers keys, they both use default exchange and queue for communication. The Administrator receives messages with admin
Topic Exchange.
The problem is with the Administrator -> everyone else communication. It works exactly like Direct exchange - I can get Companies and Workers any names, even like "#", "company1.#" etc. and they won't receive anything, unless in the Administrator I explicitly send the message with key like "work.companies.company1".
I would like to be able to use just e. g. "work.companies.#" to send message to all companies. What am I doing wrong?
Administrator.java:
public class Administrator
{
public static void main(String[] args) throws IOException, TimeoutException
{
new Thread(new TopicListener("admin", ign -> {})).start();
TopicWriter writer = new TopicWriter();
// lots of code
TopicListener.java:
public class TopicListener implements Runnable
{
private final String EXCHANGE_NAME = "space";
private String key;
private Consumer<String> msgHandler;
public TopicListener(String key, Consumer<String> msgHandler)
{
this.key = key;
this.msgHandler = msgHandler;
}
@Override
public void run()
{
try
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, key);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String msg = new String(body, StandardCharsets.UTF_8);
System.out.println("Received: \"" + msg + "\"");
msgHandler.accept(msg);
}
};
channel.basicConsume(queueName, true, consumer);
}
catch (IOException | TimeoutException e)
{ e.printStackTrace(); }
}
}
TopicWriter.java:
public class TopicWriter
{
private final String EXCHANGE_NAME = "space";
private final Channel channel;
public TopicWriter() throws IOException, TimeoutException
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
this.channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
}
public void send(String msg, String key) throws IOException
{
channel.basicPublish(
EXCHANGE_NAME,
key,
null,
msg.getBytes(StandardCharsets.UTF_8));
}
}
Company.java contains:
new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();
Worker.java contains:
new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();
I found out where the problem was: I was trying to send message to everyone using Topic, where in RabbitMQ Topic is used to specify who should receive the message. The "#" or "*" should be used in the queue key declaration, not while sending the message with a given key.