Search code examples
spring-cloudspring-kafkaspring-cloud-stream

Spring Kafka not working with portable implementation


I ask for your patience if this is a trivial question. I have googled it, and I have not found the answer.

I'm following a tutorial for Spring event driven development. In this moment, the goal is to have an agnostic/portable implementation of an event driven implementation.

For this, I have 2 Spring Boot projects, one producing/publishing topics and the other consuming the topics.

The both projects are created to manage RabbitMQ and Kafka without no code changes (or this is the intention).

With RabbitMQ everything works fine, but in Kafka it does not work. The problem seems to be that Kafka producer is adding the project name as prefix in the topic, and the consumer does not know this prefix.

Configuration:

Producer project application.yml (only relevant parts)

spring.cloud.stream:
  bindings:
    output-products:
      destination: products
      producer:
        required-groups: auditGroup
---
spring.config.activate.on-profile: kafka

spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092

Consumer project application.yml (only relevant parts)

spring.cloud.stream.bindings.input:
  destination: products
  group: productsGroup
---
spring.config.activate.on-profile: kafka

spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092

For the producer class, I have created an interface and declared as a bean

public interface MessageSources {

    String OUTPUT_PRODUCTS = "output-products";
    String OUTPUT_RECOMMENDATIONS = "output-recommendations";
    String OUTPUT_REVIEWS = "output-reviews";

    @Output(OUTPUT_PRODUCTS)
    MessageChannel outputProducts();

    @Output(OUTPUT_RECOMMENDATIONS)
    MessageChannel outputRecommendations();

    @Output(OUTPUT_REVIEWS)
    MessageChannel outputReviews();
}

And later, I use this bean to publish the topic

@EnableBinding(ProductCompositeIntegration.MessageSources.class)
@Component
public class ProductCompositeIntegration {
    public Product createProduct(Product body) {
        messageSources.outputProducts().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
        return body;
    }

For the consumer class binding, I use the Sink.class (remember, I want a portable solution)

@EnableBinding(Sink.class)
public class MessageProcessor {

    private static final Logger log = LoggerFactory
            .getLogger(MessageProcessor.class);

    private final ProductService productService;

    @Autowired
    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

        log.info("Process message created at {}...", event.getEventCreatedAt());

        switch (event.getEventType()) {
....

With everything in place, and configured for RabbitMQ, this work fine. But when I try with Kafka, I get the error:

Dispatcher has no subscribers for channel 'product-composite-1.output-products'

Where product-composite is the name of the producer project.

For reference, this is the listing of topics automatically created

bash-4.4# kafka-topics.sh  --zookeeper zookeeper:2181 --list
__consumer_offsets
error.products.productsGroup
error.recommendations.recommendationsGroup
error.reviews.reviewsGroup
products
recommendations
reviews
zipkin

So, it seems that the kafka library, with autoconfiguration active, is not able to connect to the topics:

spring.cloud.stream.bindings.<messagechannel>.destination: <topicname>

Solution

  • The problem solved when I upgraded the library versions:

    NOT WORKING

    plugins {
        id 'org.springframework.boot' version '2.4.0'
        id 'io.spring.dependency-management' version '1.0.10.RELEASE'
        id 'java'
    }
    
    group = 'com.example'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'
    
    repositories {
        mavenCentral()
        maven { url 'https://repo.spring.io/milestone' }
    }
    
    ext {
        set('springCloudVersion', "2020.0.0-M5")
    }
    
    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-amqp'
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.springframework.cloud:spring-cloud-stream'
        implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
        implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
        implementation 'org.springframework.kafka:spring-kafka'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.amqp:spring-rabbit-test'
        testImplementation 'org.springframework.kafka:spring-kafka-test'
    }
    
    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
        }
    }
    
    test {
        useJUnitPlatform()
    }
    

    WORKING

    plugins {
        id 'org.springframework.boot' version '2.4.2'
        id 'io.spring.dependency-management' version '1.0.11.RELEASE'
        id 'java'
    }
    
    group = 'com.example'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'
    
    repositories {
        mavenCentral()
        maven { url 'https://repo.spring.io/milestone' }
    }
    
    ext {
        set('springCloudVersion', "2020.0.0")
    }
    
    dependencies {
        implementation 'org.springframework.boot:spring-boot-starter-amqp'
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'org.springframework.cloud:spring-cloud-stream'
        implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
        implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
        implementation 'org.springframework.kafka:spring-kafka'
        testImplementation 'org.springframework.boot:spring-boot-starter-test'
        testImplementation 'org.springframework.amqp:spring-rabbit-test'
        testImplementation 'org.springframework.kafka:spring-kafka-test'
    }
    
    dependencyManagement {
        imports {
            mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
        }
    }
    
    test {
        useJUnitPlatform()
    }