Since version 3.1 the major API for working with queues is deprecated. In the class comment it says:
Deprecated as of 3.1 in favor of functional programming model
I searched a lot in the web for a solution but didn't find a solid E2E explanation on how I should migrate.
Looking for examples for:
If there are a few ways to do that (as I saw in web) I'd be glad for an explanation and the typical use case for each option as well.
First, some references which may help:
TL;DR
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer
/Function
/Supplier
to define your streams for you.
Input/Consumer
Whereas before you had code looking like this:
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
}
and the usage was similar to:
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
now the entire definition is irrelevant, and can be done like so:
@Service
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
}
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
notice how the Consumer
bean replaced the @StreamListener
and the @Input
.
Regarding the configuration, if before in order to configure you had an application.yml
looking like so:
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
now it should be like so:
spring:
cloud:
stream:
bindings:
gradesChannel-in-0:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
notice how gradesChannel
was replaced by gradesChannel-in-0
- to understand the full naming convention please see the naming convention link at the top.
Some details:
spring.cloud.function.definition
property.gradesChannel
you can set spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
and use everywhere in the configuration gradesChannel
.Output/Supplier
The concept here is similar, you replace config and code looking like this:
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
}
and
@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
}
}
can now be replaced by:
@Service
class StudentsQueueWriter {
@Bean
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
}
}
As you can see, we have a major difference - when is it called and by who?
Before we could trigger it manually, but now it is triggered by spring, every second (by default). This is fine for use cases such as when you need to publish a sensor data every second, but this is not good when you want to send the message on an event. Besides using Function
for whatever reason, spring offers 2 alternatives:
StreamBridge - link
Using StreamBridge
you can. define the target explicitly like so:
@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
fun publish(message: Message<Student>) {
streamBridge.send("studentsChannel-out-0", message)
}
}
This way you don't define the target channel as a bean, but you can still send the message. The downside is that you have some explicit configuration in your class.
Reactor API - link
The other way is to use some kind of reactive mechanism such as Sinks.Many
, and to return it. Using this your code will look similar to:
@Service
class StudentsQueueWriter {
val students: Sinks.Many<String> = Sinks.many().multicast().onBackpressureBuffer()
@Bean
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students.asFlux() }
}
}
and the usage may be similar to:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.tryEmitNext(Student("Adam"))
}
}