Search code examples
javaapache-kafka-streamsspring-cloud-streamconfluent-schema-registryembedded-kafka

Problem creating tests with Spring cloud streams kafka streams using embeddedKafka with MockSchemaRegistryClient


I'm trying to figure out how i can test my Spring Cloud Streams Kafka-Streams application.

The application lookls like this:

Stream 1: Topic1 > Topic2
Stream 2: Topic2 + Topic3 joined > Topic4
Stream 3: Topic4 > Topic5

I tried different approaches like the TestChannelBinder but this approach only works with Simple functions not Streams and Avro.

I decided to use EmbeddedKafka with MockSchemaRegistryClient. I can produce to a topic and also consume from the same topic again (topic1) but i'm not able to consume from (topic2).

In my test application.yaml i put the following configuration (i'm only testing the first stream for now, i want to extend it once this works):

spring.application.name: processingapp
spring.cloud:
  function.definition: stream1 # not now ;stream2;stream3
    stream:
      bindings:
        stream1-in-0:
          destination: topic1
        stream1-out-0:
          destination: topic2
      kafka:
        binder:
          min-partition-count: 1
          replication-factor: 1
          auto-create-topics: true
          auto-add-partitions: true
        bindings:
          default:
            consumer:
              autoRebalanceEnabled: true
              resetOffsets: true
              startOffset: earliest
        stream1-in-0:
          consumer:
            keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
            valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          stream1-out-0:
            producer:
              keySerde: io.confluent.kafka.streams.serdes.avro.PrimitiveAvroSerde
              valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
        streams:
          binder:
            configuration:
              schema.registry.url: mock://localtest
              specivic.avro.reader: true

My test looks like the following:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {

    private static final String INPUT_TOPIC = "topic1";

    private static final String OUTPUT_TOPIC = "topic2";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, INPUT_TOPIC, OUTPUT_TOPIC);

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @org.junit.Test
    public void testSendReceive() throws IOException {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        senderProps.put("key.serializer", LongSerializer.class);
        senderProps.put("value.serializer", SpecificAvroSerializer.class);
        senderProps.put("schema.registry.url", "mock://localtest");
        AvroFileParser fileParser = new AvroFileParser();
        DefaultKafkaProducerFactory<Long, Test1> pf = new DefaultKafkaProducerFactory<>(senderProps);
        KafkaTemplate<Long, Test1> template = new KafkaTemplate<>(pf, true);
        Test1 test1 = fileParser.parseTest1("src/test/resources/mocks/test1.json");

        template.send(INPUT_TOPIC, 123456L, test1);
        System.out.println("produced");
        
        Map<String, Object> consumer1Props = KafkaTestUtils.consumerProps("testConsumer1", "false", embeddedKafka.getEmbeddedKafka());
        consumer1Props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumer1Props.put("key.deserializer", LongDeserializer.class);
        consumer1Props.put("value.deserializer", SpecificAvroDeserializer.class);
        consumer1Props.put("schema.registry.url", "mock://localtest");
        DefaultKafkaConsumerFactory<Long, Test1> cf = new DefaultKafkaConsumerFactory<>(consumer1Props);

        Consumer<Long, Test1> consumer1 = cf.createConsumer();
        consumer1.subscribe(Collections.singleton(INPUT_TOPIC));
        ConsumerRecords<Long, Test1> records = consumer1.poll(Duration.ofSeconds(10));
        consumer1.commitSync();

        System.out.println("records count?");
        System.out.println("" + records.count());

        Test1 fetchedTest1;
        fetchedTest1 = records.iterator().next().value();
        assertThat(records.count()).isEqualTo(1);
        System.out.println("found record");
        System.out.println(fetchedTest1.toString());

        Map<String, Object> consumer2Props = KafkaTestUtils.consumerProps("testConsumer2", "false", embeddedKafka.getEmbeddedKafka());
        consumer2Props.put("key.deserializer", StringDeserializer.class);
        consumer2Props.put("value.deserializer", TestAvroDeserializer.class);
        consumer2Props.put("schema.registry.url", "mock://localtest");

        DefaultKafkaConsumerFactory<String, Test2> consumer2Factory = new DefaultKafkaConsumerFactory<>(consumer2Props);
        Consumer<String, Test2> consumer2 = consumer2Factory.createConsumer();
        consumer2.subscribe(Collections.singleton(OUTPUT_TOPIC));
        ConsumerRecords<String, Test2> records2 = consumer2.poll(Duration.ofSeconds(30));
        consumer2.commitSync();
        

        if (records2.iterator().hasNext()) {
            System.out.println("has next");
        } else {
            System.out.println("has no next");
        }
    }
}

I receive the following exception when trying to consume and deserialize from topic2:

Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 0
Caused by: java.io.IOException: Cannot get schema from schema registry!
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndIdFromRegistry(MockSchemaRegistryClient.java:193) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaBySubjectAndId(MockSchemaRegistryClient.java:249) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getSchemaById(MockSchemaRegistryClient.java:232) ~[kafka-schema-registry-client-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:307) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:86) ~[kafka-avro-serializer-6.2.0.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) ~[kafka-avro-serializer-6.2.0.jar:na]
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:895) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:812) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.1.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.1.jar:na]

There won't be a message consumed.

So i tried to overwrite the SpecificAvroSerde and register the schemas directly and use this deserializer.

public class TestAvroDeserializer<T extends org.apache.avro.specific.SpecificRecord>
        extends SpecificAvroDeserializer<T> implements Deserializer<T> {

    private final KafkaAvroDeserializer inner;

    public TestAvroDeserializer() throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);
        inner = new KafkaAvroDeserializer(mockedClient);
    }

    /**
     * For testing purposes only.
     */
    TestAvroDeserializer(final SchemaRegistryClient client) throws IOException, RestClientException {
        MockSchemaRegistryClient mockedClient = new MockSchemaRegistryClient();

        Schema.Parser parser = new Schema.Parser();
        Schema test2Schema = parser.parse(new File("./src/main/resources/avro/test2.avsc"));
        mockedClient.register("test2-value", test2Schema , 1, 0);

        inner = new KafkaAvroDeserializer(mockedClient);
    }
}

With this deserializer it won't work too. Does anyone have experience on how to do this tests with EmbeddedKafka and MockSchemaRegistry? Or is there another approach i should use?

I'm very glad if someone can help. Thank you in advance.


Solution

  • I found an appropriate way of integration testing my topology.

    I use the TopologyTestDriver from the kafka-streams-test-utils package.

    Include this dependency to Maven:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <scope>test</scope>
    </dependency>
    

    For the application described in the question setting up the TopologyTestDriver would look like the following. This code is just sequentially to show how it works.

        @Test
        void test() {
            keySerde.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), true);
            valueSerdeTopic1.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
            valueSerdeTopic2.configure(Map.of(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas"), false);
    
            final StreamsBuilder builder = new StreamsBuilder();
    
            Configuration config = new Configuration(); // class where you declare your spring cloud stream functions
            KStream<String, Topic1> input = builder.stream("topic1", Consumed.with(keySerde, valueSerdeTopic1));
    
            KStream<String, Topic2> output = config.stream1().apply(input);
            output.to("topic2");
    
            Topology topology = builder.build();
            Properties streamsConfig = new Properties();
    
            streamsConfig.putAll(Map.of(
                    org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG, "toplogy-test-driver",
                    org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ignored",
                    KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://schemas",
                    org.apache.kafka.streams.StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, PrimitiveAvroSerde.class.getName(),
                    org.apache.kafka.streams.StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class.getName()
            ));
    
            TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsConfig);
            
            TestInputTopic<String, Topic1> inputTopic = testDriver.createInputTopic("topic1", keySerde.serializer(), valueSerdeTopic1.serializer());
            TestOutputTopic<String, Topic2> outputTopic = testDriver.createOutputTopic("topic2", keySerde.deserializer(), valueSerdeTopic2.deserializer());
            inputTopic.pipeInput("key", topic1AvroModel); // Write to the input topic which applies the topology processor of your spring-cloud-stream app
            KeyValue<String, Topic2> outputRecord = outputTopic.readKeyValue(); // Read from the output topic
        }
    

    If you write more tests i recommend to abstract the setup code to not repeat yourself for each test. I highly suggest this example from the spring-cloud-streams-samples repository, it leaded me to the solution to use TopologyTestDriver.