Search code examples
springapache-kafkaspring-integrationspring-kafka

service activator mapped to kafka success channel as input channel but fails to execute on kafka send success


So I've configured Kafka outbound message adapter with channels for success and failure so I can do some post processing based on outcome of kafka publish

@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
    handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
    handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
    handler.setSendFailureChannel(kafkaFailuresChannel());
    return handler;
} 

I then wire a Service activator to this success channel which saves the successfully sent message to a message store too

@Bean
public SubscribableChannel kafkaPublishSuccessChannel() {
    return MessageChannels.direct("kafkaSuccessChannel").get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaSuccessChannel")
public MongoDbStoringMessageHandler mongoDbOutboundGateway() {
    MongoDbStoringMessageHandler mongoHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
    mongoHandler.setMongoConverter(mongoConverter);
    mongoHandler.setLoggingEnabled(TRUE);
    SpelExpressionParser parser = new SpelExpressionParser();
    mongoHandler.setCollectionNameExpression(
            parser.parseExpression(
                    "headers['" + upstreamType + "'] + '_'+ headers['" + upstreamTypeInstance + "'] + '_' + headers['" + upstreamWebhookSource + "']"));
    return mongoHandler;
}

I'm expecting service activator be invoked in case of successful publish which does not happen,

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    //publish messsge to KAfka TOpic
      ...
    //assert message saved in MongoDB
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_some-project")).extracting("key")
            .containsOnly("JRASERVER-2000");
}

The last assertion fails, and in the logs I do not see any invocation on success channel after producer has published to topic.


Solution

  • As Gary said in his comment, the sendSuccessChannel is called asynchronously on a different thread than your main JUnit runner. It is really a callback of Future completion in the Kafka client.

    So, to be sure that everything is landed in the MongoDB after sending to Kafka, you need more sophisticated assertion than just a plain findAll(). You need to iterate such a call several time during some period to be sure that other threads have done their job to send message to that channel and store document into MongoDb collection.

    For that purpose I can suggest an Awaitility tool which we really use in our own tests: https://github.com/awaitility/awaitility