Search code examples
spring-cloudspring-kafkaspring-cloud-stream

How to use "Kafka Streams Binder" with "Functional Style" and DI?


https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model shows an example where the input topic can be set using the property spring.cloud.stream.bindings.process_in.destination.

Now I want to use dependency injection, e.g.

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(JavaMailSender mailSender) {...}

When starting the application (based on Spring Boot) the property spring.cloud.stream.bindings.process_in.destination is ignored, and instead the input topic input is subscribed.

EDIT: Here is the Kotlin code (without imports)

Mailer.kt:

@Configuration
class Mailer {
    @Bean
    fun sendMail(/*mailSender: JavaMailSender*/) = Consumer<KStream<Any, Mail>> { input ->
        input.foreach { _, mail -> println("mail = $mail") }
    }
}

Mail.kt:

data class Mail(var from: String = "", var to: String = "", var subject: String = "", var body: String = "")

Application.kt:

@SpringBootApplication
class Application

fun main(args: Array<String>) {
    runApplication<Application>(*args) {
    }
}

application.yml::

spring.cloud.stream:
  bindings.sendMail_in.destination: mail
  kafka.binder.configuration.listeners: PLAINTEXT://localhost:9092

Solution

  • There were a few issues in the binder that didn't properly autowire the beans provided to a function/consumer bean. Latest snapshot solves those problems though. Please make sure that you are using the latest snapshot (3.0.0.BUILD-SNAPSHOT). Here is a sample application that works with the same scenario that you provided.