In my application, the RabbitTemplate sends an object (EventMessage - code below) to the queue. However in the RabbitListener and RabbitHandler the EmailMessage object that EventMessage contains gets converted as LinkedHashmap during deserialization
However the MessageProperties shows that the type is EventMessage
2020-09-23 18:39:47.712 WARN 16676 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{"type":101,"params":{"emailMessage":{"hasTo":true,"hasCc":false,"hasBcc":false,"template":null,"templateParams":null,"html":false,"toAddresses":["test@test.com"],"ccAddresses":null,"bccAddresses":null,"fromAddress":"test@mhserver.com","subject":"Test Subject","message":"An email has been sent to your registered email to reset your password","isHtml":false}}}' MessageProperties [headers={__TypeId__=in.teamnexus.mq.EventMessage}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=eventExchange, receivedRoutingKey=route.key.email, deliveryTag=1, consumerTag=amq.ctag-eCfxhUrcjiCF4lA7x8ZLNg, consumerQueue=queue.email])
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class in.teamnexus.email.EmailMessage (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; in.teamnexus.email.EmailMessage is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @ac49aa4)
at in.teamnexus.mq.EmailQueueListener.handleEvent(EmailQueueListener.java:34) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
Following is the RabbitConfig
@Configuration
@EnableRabbit
@PropertySource("classpath:custom.properties")
public class RabbitConfig
{
@Bean
public ConnectionFactory rabbitConnectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitMQhost);
connectionFactory.setUsername(rabbitMQUsername);
connectionFactory.setPassword(rabbitMQPassword);
connectionFactory.setVirtualHost(rabbitMQVirtualHost);
connectionFactory.setPort(rabbitMQPort);
return connectionFactory;
}
@Bean
public MessageConverter messageConverter()
{
Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
return messageConverter;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
{
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setMessageConverter(messageConverter());
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConcurrentConsumers(20);
factory.setPrefetchCount(1);
factory.setMaxConcurrentConsumers(100);
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor()
{
return RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1000, 2.0, 10000)
.recoverer(new RejectAndDontRequeueRecoverer()).build();
}
@Bean
public Queue mailQueue()
{
return new Queue("queue.email", true);
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
@Bean
public AmqpAdmin amqpAdmin()
{
RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
return rabbitAdmin;
}
@Bean
public Exchange exchange()
{
AmqpAdmin rabbitAdmin = amqpAdmin();
DirectExchange dirExchange = new DirectExchange("eventExchange", true, false);
rabbitAdmin.declareExchange(dirExchange);
rabbitAdmin.declareQueue(mailQueue());
Binding emailBinding = BindingBuilder.bind(mailQueue()).to(dirExchange).with(Constants.ROUTE_KEY_EMAIL);
rabbitAdmin.declareBinding(emailBinding);
rabbitAdmin.declareBinding(retryBinding);
return dirExchange;
}
@Bean
Publisher publisher()
{
PublisherImpl publisher = new PublisherImpl();
return publisher;
}
@Bean
EmailQueueListener emailQueueListener()
{
return new EmailQueueListener();
}
}
The EventMessage class
public class EventMessage<T> implements Serializable
{
public static final int TYPE_EMAIL = 101;
/**
*
*/
private static final long serialVersionUID = 1846120191276045453L;
@JsonProperty("type")
private int type;
@JsonProperty("params")
private Map<String, T> params;
public EventMessage()
{
}
public EventMessage(int type, Map<String, T> params)
{
this.type = type;
this.params = params;
}
// Getters Setters...
}
The PublisherImpl class
public class PublisherImpl implements Publisher
{
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public <T> EventResponse publishMessage(EventMessage<T> message, boolean async)
{
EventResponse response = new EventResponse();
if (async)
{
int resp = doPublish(message);
if (resp == EventResponse.EVENT_SUCCESS)
{
response.setStatus(EventResponse.EVENT_SUCCESS);
response.setMessage("Event Successfully published to the queue");
}
else
{
response.setStatus(EventResponse.EVENT_FAILURE);
response.setMessage("Failed to publish Event to the queue");
}
}
else
{
doSyncOp(message)
}
return response;
}
private <T> int doPublish(EventMessage<T> message)
{
String routingKey = Constants.ROUTE_KEY_EVENT;
int retVal = EventResponse.EVENT_SUCCESS;
try
{
switch (message.getType())
{
case EventMessage.TYPE_EMAIL:
{
routingKey = Constants.ROUTE_KEY_EMAIL;
this.rabbitTemplate.convertAndSend("eventExchange", routingKey, message);
break;
}
}
}
catch (AmqpException e)
{
retVal = EventResponse.EVENT_FAILURE;
logger.debug("Unable to push to the queue", e);
}
return retVal;
}
// Getters/Setters
}
The EmailQueueListener class
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.email")
public class EmailQueueListener
{
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private EmailSender emailSender;
@RabbitHandler
void handleEvent(EventMessage<EmailMessage> message)
{
logger.debug("### RabbitHandler Email: Receiving in listener:" + message);
Map<String, EmailMessage> params = message.getParams();
logger.debug("### emailMessage: " + params.get("emailMessage") + " class:" + params.get("emailMessage").getClass());
EmailMessage email = (EmailMessage) params.get("emailMessage");
emailSender.sendEmail(email);
}
// Getters/Setters
}
At the line where params.get("emailMessage") is called is where I get the exception and the configured amount of retries. I am not sure if I am doing something wrong.
EDIT
Here is the code that publishes the message
public class EmailHelper
{
@Autowired
private Publisher publisher;
public void sendEmail(String to, String cc, String bcc, String subject, String text, boolean isHtml)
{
EmailMessage emailMessage = new EmailMessage();
emailMessage.setToAddresses(new String[] { to });
if (cc != null && !cc.isEmpty())
{
emailMessage.setCcAddresses(new String[] { cc });
}
if (bcc != null && !bcc.isEmpty())
{
emailMessage.setBccAddresses(new String[] { bcc });
}
emailMessage.setFromAddress("test@mhserver.com");
emailMessage.setSubject(subject);
emailMessage.setMessage(text);
Map<String, EmailMessage> params = new HashMap<>();
params.put("emailMessage", emailMessage);
EventMessage<EmailMessage> evtMsg = new EventMessage<>(EventMessage.TYPE_EMAIL, params);
publisher.<EmailMessage>publishMessage(evtMsg, true);
}
It's because EventMessage
is a generic type; the default type mapper in the message converter can't handle arbitrary generic types.
It will work if the @RabbitListener
is defined at the method level instead of the class level because we can infer the generic type from the listener method parameter.
Otherwise, you will need to create a custom type mapper for the message converter.