Search code examples
javaspringapache-kafkaspring-kafkaspring-cloud-contract

Spring Cloud Contracts - No consumer set up for topic [...]


I am currently developing API compatibility check using Spring Cloud Contracts. I set up everything like documentation says. But I encountered a problem - java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]. That exception is thrown in KafkaStubMessages class. So I assumed that this is library related problem. In my project I have two separate maven projects. Every one of them is consumer and producer (separate topics). My contracts are placed in other repository.

So... There is scenario which I am currently working at: We have 2 modules - module A and B. Module A sends messages to Kafka topics t1 and t2 on topics T1 and T2 and receives messages t3 and t4 from topic T3 and T4. Module B receives from T1 and T2 and sends to T3 and T4.

All consumer tests pass in every module. But producer tests end up with error mentioned in topic.

I suspect that this is caused by error in stub creation. So not proper listeners are set.

I tried different kafka configuration, but I believe it is not the case. I also checked spring cloud contracts configuration, but everything seems working fine. Proper jars with stubs are generated. Unfortunately, Google is not much of a help in this matter.

If you need any information in order to help me, feel free to ask. I am working on it several days now, so I am desperate and really need your help.

EDIT: Added stack trace and relevant code snippets

Stack trace:

java.lang.IllegalStateException: No consumer set up for topic [testSyncTopic]

at org.springframework.cloud.contract.verifier.messaging.kafka.Receiver.receive(KafkaStubMessages.java:110)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:80)
at org.springframework.cloud.contract.verifier.messaging.kafka.KafkaStubMessages.receive(KafkaStubMessages.java:42)
at com.comarch.fsm.dispatcher.rest.ContractBaseTest.setup(ContractBaseTest.groovy:56)

Base test class configuration:

@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", brokerProperties = ["log.dir=target/embedded-kafka"])
@AutoConfigureStubRunner
abstract class BaseTestConfig extends Specification {
}

My contract definition:

Pattern customDateTime() {
    Pattern.compile('([0-9]{4})-(1[0-2]|0[1-9])-(0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])Z')
}

Contract.make {
    label("sync")
    input {
        triggeredBy("sync()")
    }
    outputMessage {
        sentTo("testSyncTopic")
        body(
                syncStart: $(customDateTime())
        )
    }
}

ContractBaseTest class:

abstract class ContractBaseTest extends BaseTestConfig {

    @Autowired
    private KafkaService kafkaService;

    def synchronizeData() {
        kafkaService.sendKafkaMessage("testSyncTopic", null, new SyncDto(new Date()));
    }
}

Solution

  • Why does your base test class have @AutoConfigureStubRunner it should have @AutoConfigureMessageVerifier? It seems you're mixing the consumer and the producer sides.

    Please check the example of a producer with Kafka over here: https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka . For readibility reasons I'll copy paste it here.

    THE PRODUCER

    The base class: https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka/src/test/java/com/example/BaseClass.java

    package com.example;
    
    import org.junit.runner.RunWith;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    // remove::start[]
    import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    // remove::end[]
    import org.springframework.test.context.ActiveProfiles;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
    // remove::start[]
    @AutoConfigureMessageVerifier
    @EmbeddedKafka(partitions = 1, topics = {"topic1"})
    // remove::end[]
    @ActiveProfiles("test")
    public abstract class BaseClass {
    
        @Autowired
        Controller controller;
    
        public void trigger() {
            this.controller.sendFoo("example");
        }
    }
    

    Here you can find the controller

    package com.example;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.common.Foo1;
    
    /**
     * @author Gary Russell
     * @since 2.2.1
     */
    @RestController
    public class Controller {
    
        @Autowired
        private KafkaTemplate<Object, Object> template;
    
        @PostMapping(path = "/send/foo/{what}")
        public void sendFoo(@PathVariable String what) {
            this.template.send("topic1", new Foo1(what));
        }
    
    }
    

    here you can see the production configuration (application.yml)

    spring:
      kafka:
        producer:
          value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    logging.level.org.springframework.cloud.contract: debug
    

    and here you can see the test configuration (application-test.yml)

    spring:
      kafka:
        bootstrap-servers: ${spring.embedded.kafka.brokers}
        consumer:
          properties:
            "key.serializer": "org.springframework.kafka.support.serializer.JsonSerializer"
            "key.deserializer": "org.springframework.kafka.support.serializer.JsonDeserializer"
          group-id: groupId
    

    and the contract (https://github.com/spring-cloud-samples/spring-cloud-contract-samples/blob/master/producer_kafka/src/test/resources/contracts/shouldSendFoo.groovy)

    import org.springframework.cloud.contract.spec.Contract
    
    Contract.make {
        label("trigger")
        input {
            triggeredBy("trigger()")
        }
        outputMessage {
            sentTo("topic1")
            body([
                    foo: "example"
            ])
        }
    }
    

    THE CONSUMER

    Now time for the consumer (https://github.com/spring-cloud-samples/spring-cloud-contract-samples/tree/master/consumer_kafka)

    package com.example;
    
    import org.assertj.core.api.BDDAssertions;
    import org.awaitility.Awaitility;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    // remove::start[]
    import org.springframework.cloud.contract.stubrunner.StubTrigger;
    import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
    import org.springframework.cloud.contract.stubrunner.spring.StubRunnerProperties;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    // remove::end[]
    import org.springframework.test.context.ActiveProfiles;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
    // remove::start[]
    @AutoConfigureStubRunner(ids = "com.example:beer-api-producer-kafka", stubsMode = StubRunnerProperties.StubsMode.LOCAL)
    @EmbeddedKafka(topics = "topic1")
    // remove::end[]
    @ActiveProfiles("test")
    public class ApplicationTests {
    
        // remove::start[]
        @Autowired
        StubTrigger trigger;
        @Autowired
        Application application;
    
        @Test
        public void contextLoads() {
            this.trigger.trigger("trigger");
    
            Awaitility.await().untilAsserted(() -> {
                BDDAssertions.then(this.application.storedFoo).isNotNull();
                BDDAssertions.then(this.application.storedFoo.getFoo()).contains("example");
            });
        }
        // remove::end[]
    
    }