Search code examples
javarabbitmqrabbitmq-exchange

RabbitMQ using Direct Exchange when Topic was specified


In my application I have 3 classes:
- Company, which hires Workers for any of 3 jobs
- Workers, each can do 2 jobs
- Administrator, which receives copies of all messages in the program and can send messages to all companies, all workers or just everyone

I'm using work.companies.companyName for companies keys and work.workers.workerName for workers keys, they both use default exchange and queue for communication. The Administrator receives messages with admin Topic Exchange.

The problem is with the Administrator -> everyone else communication. It works exactly like Direct exchange - I can get Companies and Workers any names, even like "#", "company1.#" etc. and they won't receive anything, unless in the Administrator I explicitly send the message with key like "work.companies.company1".
I would like to be able to use just e. g. "work.companies.#" to send message to all companies. What am I doing wrong?

Administrator.java:

public class Administrator
{
    public static void main(String[] args) throws IOException, TimeoutException
    {
        new Thread(new TopicListener("admin", ign -> {})).start();
        TopicWriter writer = new TopicWriter();
    // lots of code

TopicListener.java:

public class TopicListener implements Runnable
{
    private final String EXCHANGE_NAME = "space";
    private String key;
    private Consumer<String> msgHandler;

    public TopicListener(String key, Consumer<String> msgHandler)
    {
        this.key = key;
        this.msgHandler = msgHandler;
    }

    @Override
    public void run()
    {
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, key);

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                {
                    String msg = new String(body, StandardCharsets.UTF_8);
                    System.out.println("Received: \"" + msg + "\"");
                    msgHandler.accept(msg);
                }
            };

            channel.basicConsume(queueName, true, consumer);
        }
        catch (IOException | TimeoutException e)
        { e.printStackTrace(); }
    }
}

TopicWriter.java:

public class TopicWriter
{
    private final String EXCHANGE_NAME = "space";
    private final Channel channel;

    public TopicWriter() throws IOException, TimeoutException
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    }

    public void send(String msg, String key) throws IOException
    {
        channel.basicPublish(
                EXCHANGE_NAME,
                key,
                null,
                msg.getBytes(StandardCharsets.UTF_8));
    }
}

Company.java contains:

new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();

Worker.java contains:

new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();

Solution

  • I found out where the problem was: I was trying to send message to everyone using Topic, where in RabbitMQ Topic is used to specify who should receive the message. The "#" or "*" should be used in the queue key declaration, not while sending the message with a given key.