Search code examples
spring-bootrabbitmqamqpspring-amqpspring-rabbit

Spring AMQP RabbitMQ Object sent as one type gets converted to Map in Listener


In my application, the RabbitTemplate sends an object (EventMessage - code below) to the queue. However in the RabbitListener and RabbitHandler the EmailMessage object that EventMessage contains gets converted as LinkedHashmap during deserialization

However the MessageProperties shows that the type is EventMessage

2020-09-23 18:39:47.712  WARN 16676 --- [ntContainer#0-1] o.s.a.r.r.RejectAndDontRequeueRecoverer  : Retries exhausted for message (Body:'{"type":101,"params":{"emailMessage":{"hasTo":true,"hasCc":false,"hasBcc":false,"template":null,"templateParams":null,"html":false,"toAddresses":["test@test.com"],"ccAddresses":null,"bccAddresses":null,"fromAddress":"test@mhserver.com","subject":"Test Subject","message":"An email has been sent to your registered email to reset your password","isHtml":false}}}' MessageProperties [headers={__TypeId__=in.teamnexus.mq.EventMessage}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=eventExchange, receivedRoutingKey=route.key.email, deliveryTag=1, consumerTag=amq.ctag-eCfxhUrcjiCF4lA7x8ZLNg, consumerQueue=queue.email])

Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class in.teamnexus.email.EmailMessage (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; in.teamnexus.email.EmailMessage is in unnamed module of loader org.springframework.boot.devtools.restart.classloader.RestartClassLoader @ac49aa4)
    at in.teamnexus.mq.EmailQueueListener.handleEvent(EmailQueueListener.java:34) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na] 

Following is the RabbitConfig

@Configuration
@EnableRabbit
@PropertySource("classpath:custom.properties")
public class RabbitConfig
{

    @Bean
    public ConnectionFactory rabbitConnectionFactory()
    {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitMQhost);
        connectionFactory.setUsername(rabbitMQUsername);
        connectionFactory.setPassword(rabbitMQPassword);
        connectionFactory.setVirtualHost(rabbitMQVirtualHost);
        connectionFactory.setPort(rabbitMQPort);
        return connectionFactory;
    }

    @Bean
    public MessageConverter messageConverter()
    {
        Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
        return messageConverter;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
    {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        factory.setConnectionFactory(rabbitConnectionFactory());
        factory.setConcurrentConsumers(20);
        factory.setPrefetchCount(1);
        factory.setMaxConcurrentConsumers(100);
        factory.setAdviceChain(retryInterceptor());
        return factory;
    }

    @Bean
    public RetryOperationsInterceptor retryInterceptor()
    {
        return RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1000, 2.0, 10000)
                .recoverer(new RejectAndDontRequeueRecoverer()).build();
    }

    @Bean
    public Queue mailQueue()
    {
        return new Queue("queue.email", true);
    }

    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

    @Bean
    public AmqpAdmin amqpAdmin()
    {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory());
        return rabbitAdmin;
    }

    @Bean
    public Exchange exchange()
    {
        AmqpAdmin rabbitAdmin = amqpAdmin();
        DirectExchange dirExchange = new DirectExchange("eventExchange", true, false);

        rabbitAdmin.declareExchange(dirExchange);
        rabbitAdmin.declareQueue(mailQueue());
        Binding emailBinding = BindingBuilder.bind(mailQueue()).to(dirExchange).with(Constants.ROUTE_KEY_EMAIL);
        rabbitAdmin.declareBinding(emailBinding);
        rabbitAdmin.declareBinding(retryBinding);
        return dirExchange;
    }

    @Bean
    Publisher publisher()
    {
        PublisherImpl publisher = new PublisherImpl();
        return publisher;
    }

    @Bean
    EmailQueueListener emailQueueListener()
    {
        return new EmailQueueListener();
    }

}

The EventMessage class

public class EventMessage<T> implements Serializable
{
    public static final int TYPE_EMAIL = 101;

    /**
     * 
     */
    private static final long serialVersionUID = 1846120191276045453L;
    
    @JsonProperty("type")
    private int type;
    
    @JsonProperty("params")
    private Map<String, T> params;


    public EventMessage()
    {

    }

    public EventMessage(int type, Map<String, T> params)
    {
        this.type = type;
        this.params = params;
    }
    
    // Getters Setters...
}

The PublisherImpl class

public class PublisherImpl implements Publisher
{
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public <T> EventResponse publishMessage(EventMessage<T> message, boolean async)
    {
        EventResponse response = new EventResponse();
        if (async)
        {
            int resp = doPublish(message);
            if (resp == EventResponse.EVENT_SUCCESS)
            {
                response.setStatus(EventResponse.EVENT_SUCCESS);
                response.setMessage("Event Successfully published to the queue");
            }
            else
            {
                response.setStatus(EventResponse.EVENT_FAILURE);
                response.setMessage("Failed to publish Event to the queue");
            }

        }
        else
        {
            doSyncOp(message)
        }
        return response;
    }

    private <T> int doPublish(EventMessage<T> message)
    {
        String routingKey = Constants.ROUTE_KEY_EVENT;
        int retVal = EventResponse.EVENT_SUCCESS;
        try
        {
            switch (message.getType())
            {
                case EventMessage.TYPE_EMAIL:
                {
                    routingKey = Constants.ROUTE_KEY_EMAIL;
                    this.rabbitTemplate.convertAndSend("eventExchange", routingKey, message);
                    break;
                }
            }
        }
        catch (AmqpException e)
        {
            retVal = EventResponse.EVENT_FAILURE;
            logger.debug("Unable to push to the queue", e);
        }
        return retVal;
    }

    // Getters/Setters
}

The EmailQueueListener class

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues = "queue.email")
public class EmailQueueListener
{
    Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private EmailSender emailSender;

    @RabbitHandler
    void handleEvent(EventMessage<EmailMessage> message)
    {
        logger.debug("### RabbitHandler Email: Receiving in listener:" + message);
        Map<String, EmailMessage> params = message.getParams();
        logger.debug("### emailMessage: " + params.get("emailMessage") + " class:" + params.get("emailMessage").getClass());
        EmailMessage email = (EmailMessage) params.get("emailMessage");
        emailSender.sendEmail(email);
    }

    // Getters/Setters

}

At the line where params.get("emailMessage") is called is where I get the exception and the configured amount of retries. I am not sure if I am doing something wrong.

EDIT

Here is the code that publishes the message

public class EmailHelper
{
    @Autowired
    private Publisher publisher;

    public void sendEmail(String to, String cc, String bcc, String subject, String text, boolean isHtml)
    {
        EmailMessage emailMessage = new EmailMessage();
        emailMessage.setToAddresses(new String[] { to });
        if (cc != null && !cc.isEmpty())
        {
            emailMessage.setCcAddresses(new String[] { cc });
        }
        if (bcc != null && !bcc.isEmpty())
        {
            emailMessage.setBccAddresses(new String[] { bcc });
        }
        emailMessage.setFromAddress("test@mhserver.com");
        emailMessage.setSubject(subject);
        emailMessage.setMessage(text);
        Map<String, EmailMessage> params = new HashMap<>();
        params.put("emailMessage", emailMessage);
        EventMessage<EmailMessage> evtMsg = new EventMessage<>(EventMessage.TYPE_EMAIL, params);
        publisher.<EmailMessage>publishMessage(evtMsg, true);
    }

Solution

  • It's because EventMessage is a generic type; the default type mapper in the message converter can't handle arbitrary generic types.

    It will work if the @RabbitListener is defined at the method level instead of the class level because we can infer the generic type from the listener method parameter.

    Otherwise, you will need to create a custom type mapper for the message converter.