I configure the rabbitTemplate like below:
@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
//消息是否到达交换机的回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.info("sender not send message to the right exchange" + " correlationData=" + correlationData + " ack=" + ack + " cause" + cause);
} else {
log.info("sender send message to the right exchange" + " correlationData=" + correlationData + " ack=" + ack + " cause" + cause);
}
});
//消息是否到达正确的消息队列,如果没有会把消息返回
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
log.info("Sender send message failed: " + message + " " + replyCode + " " + replyText + " " + tmpExchange + " " + tmpRoutingKey);
//try to resend msg
});
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
rabbitTemplate.setRetryTemplate(retryTemplate);
rabbitTemplate.setMandatory(true);
this.rabbitTemplate = rabbitTemplate;
}
and the send method
public void send() {
System.out.println("sender is sending message");
String uuid1 = UUID.randomUUID().toString();
String uuid2 = UUID.randomUUID().toString();
String uuid3 = UUID.randomUUID().toString();
System.out.println("UUID="+uuid1+"---"+uuid2+"---"+uuid3);
// the right excharge name and routing key
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "aaa.orange.bbb", "hello,world1 2", new CorrelationData(uuid1));
// wrong exchage name
rabbitTemplate.convertAndSend("测试交换机名", "aaa.orange.ccc", "测试错误的交换机名", new CorrelationData(uuid2));
// wrong excharge name
rabbitTemplate.convertAndSend("测试交换机名", "1111111", "测试错误的队列名", new CorrelationData(uuid3));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
my question is when i only code
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "aaa.orange.bbb", "hello,world1 2", new CorrelationData(uuid1));
comment two lines
rabbitTemplate.convertAndSend("测试交换机名", "aaa.orange.ccc", "测试错误的交换机名", new CorrelationData(uuid2));
// wrong excharge name
rabbitTemplate.convertAndSend("测试交换机名", "1111111", "测试错误的队列名", new CorrelationData(uuid3));
the confirmCallback
log is "sender send message to the right exchange"
But if I send three message at once, the confirmCallback
logs are
three "sender not send message to the right exchange"
log and I check the queue, the right message is sending to the queue, how can I fix this problem?
Your question is not clear; if you mean you are sending to a non-existent exchange - that is considered fatal for the channel so any pending confirms will be lost.
Since Spring AMQP caches channels for reuse, downstream operations can cause the channel to be closed and the confirm lost.
For example:
@SpringBootApplication
public class So48518319Application {
public static void main(String[] args) {
SpringApplication.run(So48518319Application.class, args).close();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.setConfirmCallback((correlation, ack, cause) -> {
System.out.println(correlation + ":" + ack + " " + (cause == null ? "" : cause));
((MyCorrelationData) correlation).getLatch().countDown();
});
MyCorrelationData foo = new MyCorrelationData("foo");
MyCorrelationData bar = new MyCorrelationData("bar");
MyCorrelationData baz = new MyCorrelationData("baz");
template.convertAndSend("output", "output.foo", "foo", foo);
template.convertAndSend("output", "output.foo", "foo", bar);
template.convertAndSend("output", "output.foo", "foo", baz);
if (!foo.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Foo failed");
}
if (!bar.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Bar failed");
}
if (!baz.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Baz failed");
}
System.out.println("All good");
};
}
public static class MyCorrelationData extends CorrelationData {
private CountDownLatch latch = new CountDownLatch(1);
public MyCorrelationData(String id) {
super(id);
}
protected CountDownLatch getLatch() {
return this.latch;
}
protected void setLatch(CountDownLatch latch) {
this.latch = latch;
}
}
}
works well
CorrelationData [id=foo]:true
CorrelationData [id=bar]:true
CorrelationData [id=baz]:true
All good, but if I change it to
template.convertAndSend("output", "output.foo", "foo", foo);
template.convertAndSend("noutput", "output.foo", "foo", bar);
template.convertAndSend("noutput", "output.foo", "foo", baz);
we get
CorrelationData [id=foo]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=bar]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=baz]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
To avoid reusing the channel until the ack is received, you can use the template's invoke
method; this prevents the channel from being reused for the bad sends:
@SpringBootApplication
public class So48518319Application {
public static void main(String[] args) {
SpringApplication.run(So48518319Application.class, args).close();
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.setConfirmCallback((correlation, ack, cause) -> {
System.out.println(correlation + ":" + ack + " " + (cause == null ? "" : cause));
MyCorrelationData myCorrelation = (MyCorrelationData) correlation;
myCorrelation.getLatch().countDown();
myCorrelation.setAck(ack);
});
MyCorrelationData foo = new MyCorrelationData("foo");
MyCorrelationData bar = new MyCorrelationData("bar");
MyCorrelationData baz = new MyCorrelationData("baz");
boolean result1 = template.invoke(t -> {
t.convertAndSend("output", "output.foo", "foo", foo);
try {
if (!foo.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Foo failed");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return foo.isAck();
});
boolean result2 = template.invoke(t -> {
t.convertAndSend("noutput", "output.foo", "bar", bar);
try {
if (!bar.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Bar failed");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return bar.isAck();
});
boolean result3 = template.invoke(t -> {
t.convertAndSend("noutput", "output.foo", "baz", baz);
try {
if (!baz.getLatch().await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("Baz failed");
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return baz.isAck();
});
System.out.println("All done: " + result1 + "," + result2 + "," + result3);
};
}
public static class MyCorrelationData extends CorrelationData {
private final CountDownLatch latch = new CountDownLatch(1);
private volatile boolean ack;
public MyCorrelationData(String id) {
super(id);
}
public CountDownLatch getLatch() {
return this.latch;
}
public boolean isAck() {
return this.ack;
}
public void setAck(boolean ack) {
this.ack = ack;
}
}
}
with
CorrelationData [id=foo]:true
CorrelationData [id=bar]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
CorrelationData [id=baz]:false channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'noutput' in vhost '/', class-id=60, method-id=40)
All done: true, false, false
But that defeats the benefit of using publisher confirms, unless you do the sends on separate threads.
Bottom line is "don't send messages to non-existent exchanges if you are using confirms".