Search code examples
springspring-cloud-stream

EnableBinding, Output, Input deprecated Since version of 3.1 of Spring Cloud Stream


Since version 3.1 the major API for working with queues is deprecated. In the class comment it says:

Deprecated as of 3.1 in favor of functional programming model

I searched a lot in the web for a solution but didn't find a solid E2E explanation on how I should migrate.

Looking for examples for:

  1. read from queue
  2. write to queue

If there are a few ways to do that (as I saw in web) I'd be glad for an explanation and the typical use case for each option as well.


Solution

    1. I'm assuming you are already familiar with the main concepts, and will focus on the migration.
    2. I'm using kotlin for the demo code, to reduce verbosity

    First, some references which may help:

    • Here is the initial relevant doc: link
    • This is an explanation for the naming scheme in the new functional format: link
    • This is a more detailed explanation with some more advanced scenarios: link

    TL;DR

    Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you.

    Input/Consumer

    Whereas before you had code looking like this:

    interface BindableGradesChannel {
        @Input
        fun gradesChannel(): SubscribableChannel
    
        companion object {
            const val INPUT = "gradesChannel"
        }
    }
    

    and the usage was similar to:

    @Service
    @EnableBinding(BindableGradesChannel::class)
    class GradesListener {
        private val log = LoggerFactory.getLogger(GradesListener::class.java)
        
        @StreamListener(BindableScoresChannel.INPUT)
        fun listen(grade: Grade) {
            log.info("Received $grade")
            // do something
        }
    }
    

    now the entire definition is irrelevant, and can be done like so:

    @Service
    class GradesListener {
        private val log = LoggerFactory.getLogger(GradesListener::class.java)
    
        @Bean
        fun gradesChannel(): Consumer<Grade> {
            return Consumer { listen(grade = it) }
        }
        
        fun listen(grade: Grade) {
            log.info("Received $grade")
            // do something
        }
    }
    

    notice how the Consumer bean replaced the @StreamListener and the @Input.

    Regarding the configuration, if before in order to configure you had an application.yml looking like so:

    spring:
      cloud:
        stream:
          bindings:
            gradesChannel:
              destination: GradesExchange
              group: grades-updates
              consumer:
                concurrency: 10
                max-attempts: 3
    

    now it should be like so:

    spring:
      cloud:
        stream:
          bindings:
            gradesChannel-in-0:
              destination: GradesExchange
              group: grades-updates
              consumer:
                concurrency: 10
                max-attempts: 3
    

    notice how gradesChannel was replaced by gradesChannel-in-0 - to understand the full naming convention please see the naming convention link at the top.

    Some details:

    1. If you have more than one such bean in your application, you need to define the spring.cloud.function.definition property.
    2. You have the option to give your channels custom names, so if you'd like to continue using gradesChannel you can set spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel and use everywhere in the configuration gradesChannel.

    Output/Supplier

    The concept here is similar, you replace config and code looking like this:

    interface BindableStudentsChannel {
        @Output
        fun studentsChannel(): MessageChannel
    }
    

    and

    @Service
    @EnableBinding(BindableStudentsChannel::class)
    class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
        fun publish(message: Message<Student>) {
            studentsChannel.studentsChannel().send(message)
        }
    }
    

    can now be replaced by:

    @Service
    class StudentsQueueWriter {
        @Bean
        fun studentsChannel(): Supplier<Student> {
            return Supplier { Student("Adam") }
        }
    }
    

    As you can see, we have a major difference - when is it called and by who?

    Before we could trigger it manually, but now it is triggered by spring, every second (by default). This is fine for use cases such as when you need to publish a sensor data every second, but this is not good when you want to send the message on an event. Besides using Function for whatever reason, spring offers 2 alternatives:

    StreamBridge - link

    Using StreamBridge you can. define the target explicitly like so:

    @Service
    class StudentsQueueWriter(private val streamBridge: StreamBridge) {
        fun publish(message: Message<Student>) {
            streamBridge.send("studentsChannel-out-0", message)
        }
    }
    

    This way you don't define the target channel as a bean, but you can still send the message. The downside is that you have some explicit configuration in your class.

    Reactor API - link

    The other way is to use some kind of reactive mechanism such as Sinks.Many, and to return it. Using this your code will look similar to:

    @Service
    class StudentsQueueWriter {
        val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
        @Bean
        fun studentsChannel(): Supplier<Flux<Student>> {
            return Supplier { students.asFlux() }
        }
    }
    

    and the usage may be similar to:

    class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
        fun newStudent() {
            studentsQueueWriter.students.tryEmitNext(Student("Adam"))
        }
    }