Java 11 and Spring Integration 5.x here. I am trying to create a custom MessageSource<Widget>
that can be applied to an integration flow like so:
@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
return IntegrationFlows.from(widgetMessageSource)
.get();
}
This WidgetMessageSource
queries a remote system and fetches Widget
instances that I then need to produce Message<Widget>
with, and place them on my flow:
public class WidgetMessageSource implements MessageSource<Widget> {
@Override
public Message<Widget> receive() {
List<Widget> widgets = fetchStagedWidgets();
// How to produce a List<Message<Widget>> and not just a Message<Widget>???
}
private List<Widget> fetchStagedWidgets() {
// connect to remote system and pull down 0+ widgets
return where_the_magic_happens(); // TODO
}
}
My problem is, when fetchStagedWidgets
is called, 0+ Widget
instances will be returned. If no Widget
instances are fetched, then nothing should happen (no messages are placed on the flow).
But if Widget
instances are fetched, I need each Widget
to appear in its own Message<Widget>
. So I have a cardinality mismatch between what will happen when fetchStagedWidgets
is called and what the MessageSource
API expects (1 message per call).
What are my options here? The only thing I can think of is serializing what is sitting behind the fetchStagedWidgets
call so that it will only ever return 0 or 1 Widget
instances, but there's actually quite a lot of complexity (services + infrastructure) required to get fetchStagedWidgets
to behave that way. Any other options here, using the MessageSource
API or some other facility within Spring Integration? Thanks in advance!
For Spring Integration (and messaging at all), it doesn't matter what payload you have in the message: the infrastructure which dispatches messages might look into headers, but in most cases doesn't care about the payload.
So, you are free to have a single message with a List
as payload to return over there. You can produce something like Message<List<Message<Widget>>>
, but does it really make sense and give any advantages over Message<List<Widget>>
? Why would one wrap every item in the list into its individual message?
We have a sample in the framework for DB-specific message sources. For example, JdbcPollingChannelAdapter
. It does return a List
mapped records. Same happens with a R2dbcMessageSource
, which produces a Flux
of mapped record.
You probably need to look into a .split()
to deal with a List<Widget>
payload downstream.
Technically I also don't see a reason in the custom MessageSource
. The plain Supplier<List<Widget>>
will fit into your current requirements to call that fetchStagedWidgets()
method.