Search code examples
apache-kafkakafka-consumer-apiapache-kafka-streamsspring-kafka

Kafka: Consumer api: Regression test fails if runs in a group (sequentially)


I have implemented a kafka application using consumer api. And I have 2 regression tests implemented with stream api:

  1. To test happy path: by producing data from the test ( into the input topic that the application is listening to) that will be consumed by the application and application will produce data (into the output topic ) that the test will consume and validate against expected output data.
  2. To test error path: behavior is the same as above. Although this time application will produce data into output topic and test will consume from application's error topic and will validate against expected error output.

My code and the regression-test codes are residing under the same project under expected directory structure. Both time ( for both tests) data should have been picked up by the same listener at the application side.

The problem is :

When I am executing the tests individually (manually), each test is passing. However, If I execute them together but sequentially ( for example: gradle clean build ) , only first test is passing. 2nd test is failing after the test-side-consumer polling for data and after some time it gives up not finding any data.

Observation:

From debugging, it looks like, the 1st time everything works perfectly ( test-side and application-side producers and consumers). However, during the 2nd test it seems that application-side-consumer is not receiving any data ( It seems that test-side-producer is producing data, but can not say that for sure) and hence no data is being produced into the error topic.

What I have tried so far:

After investigations, my understanding is that we are getting into race conditions and to avoid that found suggestions like :

  • use @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
  • Tear off broker after each test ( Please see the ".destry()" on brokers)
  • use different topic names for each test

I applied all of them and still could not recover from my issue.

I am providing the code here for perusal. Any insight is appreciated.

Code for 1st test (Testing error path):

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.ERROR_TOPIC
        },

        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }
)
public class AbstractIntegrationFailurePathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;


    //To read from output error
    @Autowired
    protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;


    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

.

   @TestConfiguration
   public  class AdapterStreamFailurePathTestConfig {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.adapter.application-id}")
    private String applicationId;

    @Value("${spring.kafka.adapter.group-id}")
    private String groupId;

    //Producer of records that the program consumes
    @Bean
    public Map<String, Object> sendEmailCmdProducerConfigs() {
        Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
        results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
        return results;
    }

    @Bean
    public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
        return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
    }

    @Bean
    public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
        return new KafkaTemplate<>(inputProducerFactory());
    }


    //Consumer of the error output, generated by the program
    @Bean
    public Map<String, Object> outputErrorConsumerConfig() {
        Map<String, Object> props = KafkaTestUtils.consumerProps(
                applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
        DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
                new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
        return rpf.createConsumer(groupId, "notification-failure");
    }


}

.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {

@Autowired
private DataGenaratorForErrorPath400Test datagen;

@Mock
private AdapterHttpClient httpClient;

@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;


@Before
public void setup() throws InterruptedException {

    Mockito.when(httpClient.callApi(Mockito.any()))
            .thenReturn(
                    new GenericResponse(
                            400,
                            TestConstants.ERROR_MSG_TO_CHK));
    Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();

    inputProducerTemplate.send(
            projectProerties.getInputTopic(),
            datagen.getKey(),
            datagen.getEmailCmdToProduce());
    System.out.println("producer: "+ projectProerties.getInputTopic());

    subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);

}

@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {

    ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
    List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();

    int attempt = 0;
    int expectedRecords = 1;
    do {
        records = KafkaTestUtils.getRecords(outputErrorConsumer);
        records.forEach(outputListOfErrors::add);
        attempt++;
    } while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);

    //Verify the recipient event stream size
    Assert.assertEquals(expectedRecords, outputListOfErrors.size());

    //Validate output

}

@After
public void tearDown() {
    outputErrorConsumer.close();
    embeddedFailurePathKafkaBroker.destroy();
}

}

2nd test is almost the same in structure. Although this time the test-side-consumer is consuming from application-side-output-topic( instead of error topic). And I named the consumers,broker,producer,topics differently. Like :

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.OUTPUT_TOPIC
        },
        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }

)
public class AbstractIntegrationSuccessPathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;

    //To read from output regular topic
    @Autowired
    protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;

    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

Please let me know if I should provide any more information.,


Solution

  • In my case the consumer was not closed properly. I had to do :

    @After
    public void tearDown() {
      // shutdown hook to correctly close the streams application
      Runtime.getRuntime().addShutdownHook(new Thread(ouputConsumer::close));
    }
    

    to resolve.