I am trying to use Kotlin suspend functions as my service activator handler method. The service activator is set up in the following way:
interface WorkflowStep<T, E> {
@ServiceActivator
suspend fun handle(message: Message<T>): E {
// Pre-processing of the message that is repeated before every workflow step
return stepLogic(message)
}
suspend fun stepLogic(message: Message<T>): E
}
class ExampleStep() : WorkflowStep<String, String> {
override suspend fun stepLogic(message: Message<String>): String {
// Processing
}
}
I tried using the service activator from an IntegrationFlow
like this handle(exampleStep, "handle")
. The issue is that the message that is received in the handle
suspend function has the following payload type: class reactor.core.publisher.MonoOnErrorResume
instead of String
. When declaring a @ServiceActivator
method with an input parameter of Message<String>
type instead of Message<T>
it receives the correct type, so I assume this is related to the generic types used. Is there a way to achieve what I am trying to do here?
Are you sure that with Message<String>
you receive a value of that Mono
, but not a Mono.toString()
? There is just no way in the framework to assume your expectations between Mono
type and String
, so you probably need to add extra step in between the producer of that Mono
and this @ServiceActivator
. The easiest way is to mark your producer as an async = true
and that Mono
will be subscribed and its value emitted to the output channel.
UPDATE
Thank you for more info about your problem.
So, I wrote this simple application:
@SpringBootApplication
class So76960998Application
fun main(args: Array<String>) {
runApplication<So76960998Application>(*args)
}
@Configuration
class Configuration {
@Bean
fun testFlow(exampleStep: ExampleStep) =
integrationFlow {
handle(exampleStep, "handle") { async(true) }
}
@Bean
fun runApplication(@Qualifier("testFlow.input") inputChannel: MessageChannel) =
ApplicationRunner {
println(MessagingTemplate()
.convertSendAndReceive(inputChannel, "test", String::class.java))
}
interface WorkflowStep<T, E> {
@ServiceActivator
suspend fun handle(message: Message<T>): E {
// Pre-processing of the message that is repeated before every workflow step
return stepLogic(message)
}
suspend fun stepLogic(message: Message<T>): E
}
@Service
class ExampleStep : WorkflowStep<String, String> {
override suspend fun stepLogic(message: Message<String>): String {
println(message)
return message.payload.uppercase()
}
}
}
It works as expected and I got a TEST
in logs after using that convertSendAndReceive()
.
Pay attention, please, to the { async(true) }
configuration of the handle()
method in the integration flow.
The point is that suspend
functions are treated as reactive internally by the framework and therefore that MonoOnErrorResume
is produced as reply. When service activator is marked as async
, then this Mono
is subscribed by the framework and its value is emitted to the output channel on demand.