Search code examples
springspring-bootspring-integrationmessagingenterprise-integration

PayloadTypeRouter directly sending into a Transformer without channel in between?


I need to use a PayloadTypeRouter and want to send the routed message directly into a Transformer, Filter or ServiceActivator. All should be configured with Kotlin DSL (or Java DSL).

Currently, a nice looking part of code looks like this:

    @Bean
    fun routeAzureC2DMessage() = integrationFlow {
        channel(IoTHubChannelNames.IOTHUB_TO_DEVICE_CHANNEL)
        route<IotHubC2DRequestMessage<IotHubC2DRequest>> {
            when (it.payload) {
                is IotHubDesiredPropertyUpdate -> IoTHubChannelNames.DESIRED_PROPERTY_UPDATE
                is IotHubMessageToDevice -> IoTHubChannelNames.MESSAGE_TO_DEVICE
            }
        }
    }

and then continued by (one side of the routing)

    @Bean
    fun processMessageToDevice() = integrationFlow {
        channel(IoTHubChannelNames.MESSAGE_TO_DEVICE)
        filter(StructureFilter())
        transform(MessageTransformer())
        channel(SharedChannelNames.CLOUD2DEVICE)
    }

I would like to get rid of the unneeded channel IoTHubChannelNames.MESSAGE_TO_DEVICE. I tried several approaches and in another part of the project I figured out something like this (Java DSL)

IntegrationFlows
            .from(channelName)
            .route({ message: IotHubMessage -> message.javaClass }) { router: RouterSpec<Class<*>?, MethodInvokingRouter?> ->
                router
                    .subFlowMapping(DeviceToCloudMessage::class.java) {
                        it.handle(gateway, "handleD2CMessage")
                    }
                    .subFlowMapping(DeviceTwinUpdateMessage::class.java) {
                        it.handle(gateway, "handleDeviceTwinReportedProperty")
                    }
            }
            .get()

Are subFlowMappings the only way to get rid of the channels in between? I would like a solution where I can still use when (it.payload) and then instead of the channel/channel name could return a new integrationFlow or some other form of Flow definition.


Solution

  • The only solution at the moment is really via API:

    inline fun <reified P, T> route(
            crossinline function: (P) -> T,
            crossinline configurer: KotlinRouterSpec<T, MethodInvokingRouter>.() -> Unit) {
    

    What you are asking with that when(...) is syntax is out of support at the moment.

    Feel free to raise a GH issue on the matter and share as much as possible details what is that from Kotlin perspective and how it could be used in Spring Integration DSL.

    UPDATE

    On the other hand it is not so bad with current Kotlin support:

                route<Int, Boolean>({ it % 2 == 0 }) {
                    subFlowMapping(true) { handle<Int> { p, _ -> p * 2 } }
                    subFlowMapping(false) { handle<Int> { p, _ -> p * 3 } }
                }
    

    So, the argument of the route() method is a when() and subFlowMapping() is is with -> output as an integrationFlow builder result. So, probably we won't pursue a Kotlin when() which won't give us too much gain unless we lose subFlowMapping in favor of -> operator...

    UPDATE 2

    After more thinking about this and looking to a possible solution, I have to retract my request for this when()-kind feature request.

    The main problem that IntegrationFlow together with all its configuration must be registered in the application context upfront, before usage. What you are asking with the when() in the original router function is not what the framework can detect and process for you. This function is not a part of the framework to take responsibility for the produced result.

    Well, we could check for the IntegrationFlow return to make a decision how to call, but there is no guarantee that the flow you are going to return from that function is a registered bean. And when we really register it and try to use in such a function, it is not a difference with what we have so far with channels returning from that function and their mapping in some flow bean somewhere.

    We can register IntegrationFlow as bean automatically in some designed for this instructions like subFlowMapping(). Anyway it is done only once, on configuration phase. But it is not so good to do that when end-user code returns flows at runtime. It is better to return channels or some other keys where we have a mapping for existing flows.

    I personally prefer do not ignore a MessageChannel abstraction and use it when ever I need to distribute a logic between different flows. The code in a single flow looks more cleaner when it is linear and represents a single logic unit of work. The other flows might be reused in other logic though. What I would only need is to point to those flows input channels from other places!

    Nevertheless my main point is: we must register and IntegrationFlow before we are going to send a message to it.