I have a simple logging handler beans configuration which I inject into an IntegrationFlow
@Configuration
class LogHandlerConfiguration {
private LoggingHandler handler;
@Bean
public MessageHandler kafkaSuccessHandler() {
return getLogger(LoggingHandler.Level.INFO);
}
@Bean(name="kafkaFailureHandler")
public MessageHandler kafkaFailureHandler() {
return getLogger(LoggingHandler.Level.ERROR);
}
private LoggingHandler getLogger(LoggingHandler.Level level) {
handler = new LoggingHandler(level);
handler.setShouldLogFullMessage(Boolean.TRUE);
return handler;
}
}
the integration flow to test
@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
return IntegrationFlows.from(kafkaErrorChannel)
.transform("payload.failedMessage")
.handle(kafkaFailureHandler)
.get();
}
Here's my test
@SpyBean
MessageHandler kafkaFailureHandler;
@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(KafkaPublishFailureTest.class);
}
@Test
void testFailedKafkaPublish() {
//Dummy message
Map<String, String> map = new HashMap<>();
map.put("key", "value");
// Publish Message
Message<Map<String, String>> message = MessageBuilder.withPayload(map)
.setHeader("X-UPSTREAM-TYPE", "alm")
.setHeader("X-UPSTREAM-INSTANCE", "jira")
.setHeader("X-MESSAGE-KEY", "key-1")
.build();
kafkaGateway.publish(message);
// Failure handler called
Mockito.verify(kafkaFailureHandler, Mockito.timeout(0).atLeastOnce()).handleMessage(
ArgumentMatchers.any(Message.class));
}
We've created a generic Kafka Producer, Consumer configuration to which downsteam apps can attach failure and success handler best suited to their needs. I'm not able to verify the the LoggingHandler
in this case is called atleast once.
The failureHandler
gets executed under an ExecturoeChannel
backed by ThreadPoolTaskExecutor
@Bean
ExecutorChannel kafkaErrorChannel(Executor threadPoolExecutor) {
return MessageChannels.executor("kafkaErrorChannel", threadPoolExecutor).get();
}
failures are handled via retry advice
@Bean
RequestHandlerRetryAdvice retryAdvice(ExecutorChannel kafkaErrorChannel) {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
return retryAdvice;
}
I get this error when I run the test
java.lang.IllegalStateException: No bean found for definition [SpyDefinition@44dfdd58 name = '', typeToSpy = org.springframework.messaging.MessageHandler, reset = AFTER]
at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
at org.springframework.boot.test.mock.mockito.MockitoPostProcessor.inject(MockitoPostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]
So, was hung up over Why not @SpyBean
? There were two problems
@SpyBean
Here's what Finally worked, using a named bean
@Bean("kafkaFailureHandler")
public MessageHandler kafkaFailureHandler() {
LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
handler.setShouldLogFullMessage(Boolean.TRUE);
return handler;
}
and then in tests reducing the max block too
@DirtiesContext
@SpringBootTest(classes = {KafkaHandlerConfiguration.class, SwiftalkKafkaGateway.class})
@SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
@TestPropertySource(properties = {
"spring.main.banner-mode=off",
"logging.level.root=INFO",
"logging.level.org.springframework=INFO",
"logging.level.org.springframework.integration=DEBUG",
"spring.kafka.producer.properties.max.block.ms=50",
"spring.kafka.producer.bootstrap-servers=localhost:9999",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
})
public class KafkaPublishFailureTest {
private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);
@Autowired
SwiftalkKafkaGateway kafkaGateway;
@SpyBean(name = "kafkaFailureHandler")
MessageHandler kafkaFailureHandler;
@Test
@SuppressWarnings("all")
void testFailedKafkaPublish() throws InterruptedException {
//Dummy message
Map<String, String> map = new HashMap<>();
map.put("key", "value");
// Publish Message
Message<Map<String, String>> message = MessageBuilder.withPayload(map)
.setHeader("X-UPSTREAM-TYPE", "alm")
.setHeader("X-UPSTREAM-INSTANCE", "jira")
.setHeader("X-MESSAGE-KEY", "key-1")
.build();
kafkaGateway.publish(message);
verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));
}
}
notice the spring.kafka.producer.properties.max.block.ms=50
and verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));