And so, step by step, what do I want to do:
I receive data if an error occurs when sending to another system, then I want to send data to rabbitMQ:
@Override
public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
try {
anketaService.updateAnketa(profileId, anketa, profileVersion);
} catch (ClubProNotAvailableException e) {
rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
profileService.updateProfileAnketa(profileId, a, null);
}
}
}
Next, I want to accept these data and queues and try sending them again at a certain time interval.
For this i:
I accept messages
I'm trying to resend it:
a) If everything was successful, I delete it from the queue
b) If an error occurred, I call the stop method for the container. After a certain time I use the scheduler to call the start method for the container
@Override
public void onMessage(Message message, Channel channel) throws IOException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
listenerContainer.stop();
throw new ClubProNotAvailableException();
}
}
public void startContainer() {
listenerContainer.start();
}
I have encountered such problems:
The message is not delivered to the queue every time. Sometimes I have to call the convert And Send method several times.
When I got messages from the queue and an error occurred, I turn off the container, then when it turns on, the queue is empty, and when I turn off, I see this message:
2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0
How can I fix this situation?
CONTINUED QUESTION.
I corrected the code like this:
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
Thread.sleep(20000);
}
}
Thread.sleep here for experiment.
I expect that when I grab a message from the queue in the rabbitmq admin console, I will see it go to Unacked status, this is how it happens.
Then, when an error occurs, I call the basicReject method, and I want the status to become ready, immediately after the basicReject call line, but it becomes ready as soon as the method completes completely.
Unacked status:
Although the baseReject method has already worked.
Why is this happening? how is it supposed to work and what mechanism? why doesn't the message become immediately ready (status in console rabbit) after calling the baseReject method?
Closing channel for unresponsive consumer:
This means the listener is "stuck" in your code - you can't call stop()
from the listener itself - the container.stop()
waits for the listener to exit. You should use stop(() -> log.info("stopped container"))
instead.
You need to basicReject
in the catch case - the container won't handle it for you with MANUAL acks.
You MUST use MANUAL acks if you ack/nack the message yourself.
It's generally better to let the container take care of acking your messages.