Search code examples
unit-testingexceptionapache-kafkaapache-kafka-streamsspring-cloud-config

Kafka Streams - Unit test for ProductionExceptionHandler implementation


I am developing a Kafka streams application that uses Spring Cloud streams for configuration. In order to handle "RecordTooLargeException" I have implemented a custom ProductionExceptionHandler as below.

public class CustomProductionExceptionHandler implements ProductionExceptionHandler {

 @Override
 public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record,
                                                 Exception exception) {
    if (exception instanceof RecordTooLargeException) {
      
        return ProductionExceptionHandlerResponse.CONTINUE;
    }
    return ProductionExceptionHandlerResponse.FAIL;
 }

 @Override
 public void configure(Map<String, ?> configs) {
 }

}

I have added the following property in my application.yml

spring.kafka:
    streams:
     properties:
       default.production.exception.handler: "com.fd.acquisition.product.availability.product.exception.StreamsRecordProducerErrorHandler"

The code works fine as the default failing behavior is overridden and the processing is continued.

I am trying to write unit test cases to simulate this behaviour. I am making use of TopologyTestDriver and it has the following configuration.

    Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "TopologyTestDriver");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    streamsConfiguration.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "200");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, (10 * 1024 ));
    streamsConfiguration.put(StreamsConfig.SEND_BUFFER_CONFIG, (10 ));
    streamsConfiguration.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, (10 ));

    final String tempDrectory = Files.createTempDirectory("kafka-streams").toAbsolutePath().toString();
    streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, tempDrectory);
    streamsConfiguration.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
            CustomProductionExceptionHandler.class);

Ideally, the size of the record should throw RecordTooLargeException when I push a large record. But the MockProducer class is used instead of KafkaProducer and hence the ensureValidRecordSize() method is never called.

Is there any way to override this behaviour or any other way to simulate the RecordTooLargeException ?


Solution

  • I think this is basically a feature missing from MockProducer. So the first option would be to contribute a PR back to Kafka that adds the missing check to MockProducer. It looks that it should be fairly easy to add it. If you are not a developer, alternatively you can create an issue and hope somebody from the community will pick it up.

    Either way, you will have to wait for the next Kafka release to get the feature. You have to decide whether you can live without the unit test until then. There could be ways to hack this (use reflection to make TopologyTestDriver.producer visible / non-final and replace it by an "improved" version of MockProducer), but it's probably going to be harder than to just contribute to Kafka.