Search code examples
springspring-bootredisspring-dataspring-data-redis

Why isn't RedisMessageListenerContainer starting up?


I'm trying to start up a Pub/Sub setup with Spring Data Redis, and I'm able to load the publisher, but the RedisMessageListenerContainer doesn't start up automatically. I'm using Spring Data Redis 2.2.8.RELEASE along with embedded redis server (it.ozimov.embedded-redis version 0.7.2). Does anyone have any insight as to why the RedisMessageListenerContainer won't start up?

Here are my classes.

RedisListenerAutoConfiguration

@Configuration
@ConditionalOnProperty(prefix = "myproj.redis", name = "mode", havingValue = "LISTENER", matchIfMissing = true)
@ComponentScan("com.jcworx.redis.listener")
@ConditionalOnBean(type = "com.jcworx.redis.listener.RedisMessageListener")
@AutoConfigureAfter(RedisAutoConfiguration.class)
@EnableConfigurationProperties(RedisConfigurationProperties.class)
public class RedisListenerAutoConfiguration {

    @Autowired
    private RedisConfigurationProperties redisConfigurationProperties;

    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
        return new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new ChannelTopic(redisConfigurationProperties.getQueueName()));
        return container;
    }
}

SimpleRedisMessageListener

@Component
public class SimpleRedisMessageListener extends AbstractRedisMessageListener<SimpleType>{

    private static final Logger LOG = LoggerFactory.getLogger(SimpleRedisMessageListener.class);

    private CountDownLatch countDownLatch;

    @Override
    public void processRedisMsg(RedisMessage<SimpleType> redisMsg) {
        LOG.info("Processing Message. trxId={}, payload={}",redisMsg.getTrxId(),redisMsg.getPayload());
        Assert.notNull(countDownLatch,"Count Down Latch cannot be null.");
        countDownLatch.countDown();
    }

    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

}

RedisServerConfiguration

@Configuration
@Profile("test")
public class RedisServerConfiguration {
    private RedisServer redisServer;

    @Autowired //redisProperties autowired from RedisAutoConfiguration
    public RedisServerConfiguration(RedisProperties redisProperties){
        redisServer = new RedisServer(redisProperties.getPort());
    }

    @PostConstruct
    public void postConstruct(){
        redisServer.start();
    }

    @PreDestroy
    public void preDestroy(){
        redisServer.stop();
    }
}

application-test.properties

#application test resources
myproj.redis.queueName=test
spring.redis.host=localhost
spring.redis.port=6379
#set to true when you need to see the auto configuration rules
debug=true 

RedisPubSubABTTest

@SpringBootTest(classes = TestRedisApp.class)
@ActiveProfiles("test")
public class RedisPubSubABTTest {
    
    @Autowired
    private RedisMessagePublisher redisMessagePublisher;

    @Autowired
    private SimpleRedisMessageListener simpleRedisMessageListener;

    /**
     * Send a message to the embedded redis queue and await the listener to respond. If it
     * responds, then the countdown latch will count down to 0. Otherwise, it will time out
     * and fail to respond.
     * @throws InterruptedException
     */
    @Test
    public void messageSentAndReceived() throws InterruptedException{
        //ARRANGE
        SimpleType simpleType = new SimpleType();
        simpleType.setFirstName("John");
        simpleType.setLastName("Smith");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        simpleRedisMessageListener.setCountDownLatch(countDownLatch);
        RedisMessage<SimpleType> redisMsg = new RedisMessage.Builder<SimpleType>().TrxId(UUID.randomUUID().toString())
                                                                                              .payload(simpleType)
                                                                                              .build();
        //ACT
        redisMessagePublisher.publish(redisMsg);
        boolean responded = countDownLatch.await(5, TimeUnit.SECONDS);
        //ASSERT
        Assertions.assertTrue(responded);
    }
    
}

Solution

  • As it turns out, The MessageListenerAdapter uses the RedisSerializer.string() as the default serializer. This means that any POJO other than a String in the parameter list of the listener method will be ignored. In order to get past this, you need to invoke the setSerializer method and pass in RedisSerializer.java() as the argument. This will let the MessageListenerAdapter know that the POJO is a java class and needs to be serialized/deserialized. Please note that whatever pojo that you decide to pass in MUST implement java.io.Serializable. Please see the example below, and hopefully this helps someone else.

    @Bean
    public MessageListenerAdapter messageListenerAdapter(RedisMessageListener<?> redisMessageListener){
        MessageListenerAdapter msgAdapter = new MessageListenerAdapter(redisMessageListener,"onRedisMessage");
        msgAdapter.setSerializer(RedisSerializer.java());
        return msgAdapter;
    }