Search code examples
javaspringspring-bootspring-integration

How spy an autowired bean in Spring Tests


I have a simple logging handler beans configuration which I inject into an IntegrationFlow

@Configuration
class LogHandlerConfiguration {

    private LoggingHandler handler;

    @Bean
    public MessageHandler kafkaSuccessHandler() {
        return getLogger(LoggingHandler.Level.INFO);
    }

    @Bean(name="kafkaFailureHandler")
    public MessageHandler kafkaFailureHandler() {
        return getLogger(LoggingHandler.Level.ERROR);
    }

    private LoggingHandler getLogger(LoggingHandler.Level level) {
        handler = new LoggingHandler(level);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }
}

the integration flow to test

@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
    return IntegrationFlows.from(kafkaErrorChannel)
            .transform("payload.failedMessage")
            .handle(kafkaFailureHandler)
            .get();
}

Here's my test

@SpyBean
MessageHandler kafkaFailureHandler;

@BeforeEach
public void setup() {
    MockitoAnnotations.openMocks(KafkaPublishFailureTest.class);
}

@Test
void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-UPSTREAM-INSTANCE", "jira")
            .setHeader("X-MESSAGE-KEY", "key-1")
            .build();

    kafkaGateway.publish(message);
    // Failure handler called
    Mockito.verify(kafkaFailureHandler, Mockito.timeout(0).atLeastOnce()).handleMessage(
            ArgumentMatchers.any(Message.class));
}

We've created a generic Kafka Producer, Consumer configuration to which downsteam apps can attach failure and success handler best suited to their needs. I'm not able to verify the the LoggingHandler in this case is called atleast once.

The failureHandler gets executed under an ExecturoeChannel backed by ThreadPoolTaskExecutor

@Bean
ExecutorChannel kafkaErrorChannel(Executor threadPoolExecutor) {
    return MessageChannels.executor("kafkaErrorChannel", threadPoolExecutor).get();
}

failures are handled via retry advice

@Bean
RequestHandlerRetryAdvice retryAdvice(ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    return retryAdvice;
}

I get this error when I run the test

java.lang.IllegalStateException: No bean found for definition [SpyDefinition@44dfdd58 name = '', typeToSpy = org.springframework.messaging.MessageHandler, reset = AFTER]
    at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
    at org.springframework.boot.test.mock.mockito.MockitoPostProcessor.inject(MockitoPostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]

Solution

  • So, was hung up over Why not @SpyBean? There were two problems

    1. both success and failure handlers were MessageHandlers confusing @SpyBean
    2. The Kafka producer waiting time was too high, i.e. 1000ms

    Here's what Finally worked, using a named bean

    @Bean("kafkaFailureHandler")
    public MessageHandler kafkaFailureHandler() {
        LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }
    

    and then in tests reducing the max block too

    @DirtiesContext
    @SpringBootTest(classes = {KafkaHandlerConfiguration.class, SwiftalkKafkaGateway.class})
    @SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
    @TestPropertySource(properties = {
            "spring.main.banner-mode=off",
            "logging.level.root=INFO",
            "logging.level.org.springframework=INFO",
            "logging.level.org.springframework.integration=DEBUG",
            "spring.kafka.producer.properties.max.block.ms=50",
            "spring.kafka.producer.bootstrap-servers=localhost:9999",
            "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
            "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
    })
    public class KafkaPublishFailureTest {
    
        private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);
    
        @Autowired
        SwiftalkKafkaGateway kafkaGateway;
    
        @SpyBean(name = "kafkaFailureHandler")
        MessageHandler kafkaFailureHandler;
    
        @Test
        @SuppressWarnings("all")
        void testFailedKafkaPublish() throws InterruptedException {
    
            //Dummy message
            Map<String, String> map = new HashMap<>();
            map.put("key", "value");
            // Publish Message
            Message<Map<String, String>> message = MessageBuilder.withPayload(map)
                    .setHeader("X-UPSTREAM-TYPE", "alm")
                    .setHeader("X-UPSTREAM-INSTANCE", "jira")
                    .setHeader("X-MESSAGE-KEY", "key-1")
                    .build();
    
            kafkaGateway.publish(message);
            verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));
        }
    
    }
    

    notice the spring.kafka.producer.properties.max.block.ms=50 and verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));