I have 2 Spring RabbitMq configurations, one using the RabbitTemplate, one using the JmsTemplate.
The configuration with the RabbitTemplate:
Class AmqpMailIntegrationPerfTestConfig:
@Configuration
@ComponentScan(basePackages = {
"com.test.perf.amqp.receiver",
"com.test.perf.amqp.sender"
})
@EnableRabbit
public class AmqpMailIntegrationPerfTestConfig {
@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("mail", MailMessage.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}
@Bean
public RabbitTemplate myRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public ConnectionFactory createConnectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
return connectionFactory;
}
@Bean
Queue queue() {
return new Queue(AmqpMailSenderImpl.QUEUE_NAME, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(AmqpMailSenderImpl.TOPIC_EXCHANGE_NAME);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(AmqpMailSenderImpl.ROUTING_KEY);
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(createConnectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(createConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
}
The AmqpMailSenderPerfImpl class in com.test.perf.amqp.sender package:
@Component
public class AmqpMailSenderPerfImpl implements MailSender {
public static final String TOPIC_EXCHANGE_NAME = "mails-exchange";
public static final String ROUTING_KEY = "mails";
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public boolean sendMail(MailMessage message) {
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, ROUTING_KEY, message);
return true;
}
}
The AmqpMailReceiverPerfImpl class in com.test.perf.amqp.receiver package:
@Component
public class AmqpMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@RabbitListener(containerFactory = "myRabbitListenerContainerFactory", queues = AmqpMailSenderImpl.QUEUE_NAME)
public void receiveMessage(MailMessage message) {
logger.info("------ Received mail! ------\nmessage:" + message.getSubject());
datesReceived.put(message.getSubject(), new Date());
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
The configuration with the JmsTemplate:
Class JmsMailIntegrationPerfTestConfig:
@Configuration
@EnableJms
@ComponentScan(basePackages = {
"com.test.perf.jms.receiver",
"com.test.jms.sender"
})
public class JmsMailIntegrationPerfTestConfig {
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
Map<String,Class<?>> typeIdMappings = new HashMap<String,Class<?>>();
typeIdMappings.put("mail", MailMessage.class);
converter.setTypeIdMappings(typeIdMappings);
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public ConnectionFactory createConnectionFactory(){
RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
return connectionFactory;
}
@Bean(name = "myJmsFactory")
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("10-50");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public Destination jmsDestination() {
RMQDestination jmsDestination = new RMQDestination();
jmsDestination.setDestinationName("myQueue");
jmsDestination.setAmqp(false);
jmsDestination.setAmqpQueueName("mails");
return jmsDestination;
}
@Bean
public JmsTemplate myJmsTemplate(ConnectionFactory connectionFactory) {
final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
return jmsTemplate;
}
}
The JmsMailSenderImpl class in package com.test.jms.sender:
@Component
public class JmsMailSenderImpl implements MailSender {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private JmsTemplate jmsTemplate;
@Override
public boolean sendMail(MailMessage message) {
logger.info("Sending message!");
jmsTemplate.convertAndSend("mailbox", message);
return false;
}
}
The JmsMailReceiverPerfImpl class in package com.test.perf.jms.receiver:
@Component
public class JmsMailReceiverPerfImpl implements ReceivedDatesKeeper {
private Logger logger = LoggerFactory.getLogger(getClass());
private Map<String,Date> datesReceived = new HashMap<String, Date>();
@JmsListener(destination = "mailbox", containerFactory = "myJmsFactory", concurrency = "10")
public void receiveMail(MailMessage message) {
datesReceived.put(message.getSubject(), new Date());
logger.info("Received <" + message.getSubject() + ">");
}
public Map<String, Date> getDatesReceived() {
return datesReceived;
}
}
I test the above configurations by starting 10 threads and making the respective MailSenders send 1000 mails each.
For the config with the RabbitTemplate I get: * Total throughput time of all messages: 3687ms * Time to process one message: 817ms
For the config with the JmsTemplate I get: * Total throughput time of all messages: 41653ms * Time to process one message: 67ms
This seems to indicate that the version with the JmsTemplate is not working in parallel, or at least, does not use resources optimally.
Does anybody know what could be causing this? I played around with different transaction and concurrency parameters but to no avail.
What we want is to get the same throughput time with the JmsTemplate as with the RabbitTemplate, so we can use JMS as an abstraction layer.
I can see why the consumer side is slower - the Consumer.receive()
uses a synchronous basicGet()
for each message whereas the @RabbitListener
container uses basicConsume
with a prefetch count of 250.
On the JMS sending side, you need to use a CachingConnectionFactory
there are well, otherwise a new session/producer/channel is created for each send.
It's still quite a bit slower, though, even with that; I suggest you ask on the rabbitmq-users Google group where the RabbitMQ engineers hang out. They maintain the JMS client.