Search code examples
spring-integration

Spring Integration service activator suspend handler function with generics


I am trying to use Kotlin suspend functions as my service activator handler method. The service activator is set up in the following way:

interface WorkflowStep<T, E> {

    @ServiceActivator
    suspend fun handle(message: Message<T>): E {
        // Pre-processing of the message that is repeated before every workflow step
        return stepLogic(message)
    }

    suspend fun stepLogic(message: Message<T>): E
}

class ExampleStep() : WorkflowStep<String, String> {

    override suspend fun stepLogic(message: Message<String>): String {
      // Processing
    }
}

I tried using the service activator from an IntegrationFlow like this handle(exampleStep, "handle"). The issue is that the message that is received in the handle suspend function has the following payload type: class reactor.core.publisher.MonoOnErrorResume instead of String. When declaring a @ServiceActivator method with an input parameter of Message<String> type instead of Message<T> it receives the correct type, so I assume this is related to the generic types used. Is there a way to achieve what I am trying to do here?


Solution

  • Are you sure that with Message<String> you receive a value of that Mono, but not a Mono.toString()? There is just no way in the framework to assume your expectations between Mono type and String, so you probably need to add extra step in between the producer of that Mono and this @ServiceActivator. The easiest way is to mark your producer as an async = true and that Mono will be subscribed and its value emitted to the output channel.

    UPDATE

    Thank you for more info about your problem.

    So, I wrote this simple application:

    @SpringBootApplication
    class So76960998Application
    
    fun main(args: Array<String>) {
        runApplication<So76960998Application>(*args)
    }
    
    @Configuration
    class Configuration {
    
        @Bean
        fun testFlow(exampleStep: ExampleStep) =
                integrationFlow {
                    handle(exampleStep, "handle") { async(true) }
                }
    
        @Bean
        fun runApplication(@Qualifier("testFlow.input") inputChannel: MessageChannel) =
                ApplicationRunner {
                    println(MessagingTemplate()
                            .convertSendAndReceive(inputChannel, "test", String::class.java))
                }
    
        interface WorkflowStep<T, E> {
    
            @ServiceActivator
            suspend fun handle(message: Message<T>): E {
                // Pre-processing of the message that is repeated before every workflow step
                return stepLogic(message)
            }
    
            suspend fun stepLogic(message: Message<T>): E
        }
    
        @Service
        class ExampleStep : WorkflowStep<String, String> {
    
            override suspend fun stepLogic(message: Message<String>): String {
                println(message)
                return message.payload.uppercase()
            }
    
        }
        
    }
    

    It works as expected and I got a TEST in logs after using that convertSendAndReceive().

    Pay attention, please, to the { async(true) } configuration of the handle() method in the integration flow.

    The point is that suspend functions are treated as reactive internally by the framework and therefore that MonoOnErrorResume is produced as reply. When service activator is marked as async, then this Mono is subscribed by the framework and its value is emitted to the output channel on demand.