I'm trying to start up a Pub/Sub setup with Spring Data Redis, and I'm able to load the publisher, but the RedisMessageListenerContainer doesn't start up automatically. I'm using Spring Data Redis 2.2.8.RELEASE along with embedded redis server (it.ozimov.embedded-redis version 0.7.2). Does anyone have any insight as to why the RedisMessageListenerContainer won't start up?
Here are my classes.
RedisListenerAutoConfiguration
@Configuration
@ConditionalOnProperty(prefix = "myproj.redis", name = "mode", havingValue = "LISTENER", matchIfMissing = true)
@ComponentScan("com.jcworx.redis.listener")
@ConditionalOnBean(type = "com.jcworx.redis.listener.RedisMessageListener")
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConfigurationProperties.class)
public class RedisListenerAutoConfiguration {
@Autowired
private RedisConfigurationProperties redisConfigurationProperties;
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
return new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(messageListenerAdapter, new ChannelTopic(redisConfigurationProperties.getQueueName()));
return container;
}
}
SimpleRedisMessageListener
@Component
public class SimpleRedisMessageListener extends AbstractRedisMessageListener<SimpleType>{
private static final Logger LOG = LoggerFactory.getLogger(SimpleRedisMessageListener.class);
private CountDownLatch countDownLatch;
@Override
public void processRedisMsg(RedisMessage<SimpleType> redisMsg) {
LOG.info("Processing Message. trxId={}, payload={}",redisMsg.getTrxId(),redisMsg.getPayload());
Assert.notNull(countDownLatch,"Count Down Latch cannot be null.");
countDownLatch.countDown();
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}
RedisServerConfiguration
@Configuration
@Profile("test")
public class RedisServerConfiguration {
private RedisServer redisServer;
@Autowired //redisProperties autowired from RedisAutoConfiguration
public RedisServerConfiguration(RedisProperties redisProperties){
redisServer = new RedisServer(redisProperties.getPort());
}
@PostConstruct
public void postConstruct(){
redisServer.start();
}
@PreDestroy
public void preDestroy(){
redisServer.stop();
}
}
application-test.properties
#application test resources
myproj.redis.queueName=test
spring.redis.host=localhost
spring.redis.port=6379
#set to true when you need to see the auto configuration rules
debug=true
RedisPubSubABTTest
@SpringBootTest(classes = TestRedisApp.class)
@ActiveProfiles("test")
public class RedisPubSubABTTest {
@Autowired
private RedisMessagePublisher redisMessagePublisher;
@Autowired
private SimpleRedisMessageListener simpleRedisMessageListener;
/**
* Send a message to the embedded redis queue and await the listener to respond. If it
* responds, then the countdown latch will count down to 0. Otherwise, it will time out
* and fail to respond.
* @throws InterruptedException
*/
@Test
public void messageSentAndReceived() throws InterruptedException{
//ARRANGE
SimpleType simpleType = new SimpleType();
simpleType.setFirstName("John");
simpleType.setLastName("Smith");
CountDownLatch countDownLatch = new CountDownLatch(1);
simpleRedisMessageListener.setCountDownLatch(countDownLatch);
RedisMessage<SimpleType> redisMsg = new RedisMessage.Builder<SimpleType>().TrxId(UUID.randomUUID().toString())
.payload(simpleType)
.build();
//ACT
redisMessagePublisher.publish(redisMsg);
boolean responded = countDownLatch.await(5, TimeUnit.SECONDS);
//ASSERT
Assertions.assertTrue(responded);
}
}
As it turns out, The MessageListenerAdapter
uses the RedisSerializer.string()
as the default serializer. This means that any POJO other than a String in the parameter list of the listener method will be ignored. In order to get past this, you need to invoke the setSerializer
method and pass in RedisSerializer.java()
as the argument. This will let the MessageListenerAdapter
know that the POJO is a java class and needs to be serialized/deserialized. Please note that whatever pojo that you decide to pass in MUST implement java.io.Serializable
. Please see the example below, and hopefully this helps someone else.
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
MessageListenerAdapter msgAdapter = new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
msgAdapter.setSerializer(RedisSerializer.java());
return msgAdapter;
}