Search code examples
spring-integration

Serializing custom Spring Integration MessageSources that produce lists of messages


Java 11 and Spring Integration 5.x here. I am trying to create a custom MessageSource<Widget> that can be applied to an integration flow like so:

@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
    return IntegrationFlows.from(widgetMessageSource)                 
        .get();
}

This WidgetMessageSource queries a remote system and fetches Widget instances that I then need to produce Message<Widget> with, and place them on my flow:

public class WidgetMessageSource implements MessageSource<Widget> {
    
    @Override
    public Message<Widget> receive() {

        List<Widget> widgets = fetchStagedWidgets();
        // How to produce a List<Message<Widget>> and not just a Message<Widget>???

    }

    private List<Widget> fetchStagedWidgets() {
        // connect to remote system and pull down 0+ widgets
        return where_the_magic_happens(); // TODO
    }

}

My problem is, when fetchStagedWidgets is called, 0+ Widget instances will be returned. If no Widget instances are fetched, then nothing should happen (no messages are placed on the flow).

But if Widget instances are fetched, I need each Widget to appear in its own Message<Widget>. So I have a cardinality mismatch between what will happen when fetchStagedWidgets is called and what the MessageSource API expects (1 message per call).

What are my options here? The only thing I can think of is serializing what is sitting behind the fetchStagedWidgets call so that it will only ever return 0 or 1 Widget instances, but there's actually quite a lot of complexity (services + infrastructure) required to get fetchStagedWidgets to behave that way. Any other options here, using the MessageSource API or some other facility within Spring Integration? Thanks in advance!


Solution

  • For Spring Integration (and messaging at all), it doesn't matter what payload you have in the message: the infrastructure which dispatches messages might look into headers, but in most cases doesn't care about the payload.

    So, you are free to have a single message with a List as payload to return over there. You can produce something like Message<List<Message<Widget>>>, but does it really make sense and give any advantages over Message<List<Widget>>? Why would one wrap every item in the list into its individual message?

    We have a sample in the framework for DB-specific message sources. For example, JdbcPollingChannelAdapter. It does return a List mapped records. Same happens with a R2dbcMessageSource, which produces a Flux of mapped record.

    You probably need to look into a .split() to deal with a List<Widget> payload downstream.

    Technically I also don't see a reason in the custom MessageSource. The plain Supplier<List<Widget>> will fit into your current requirements to call that fetchStagedWidgets() method.