I have a requirement of calling 3 different http endpoint which will return me a same domain object and gather the response of these 3 different http endpoint in a single object. I am using Spring scatter gather pattern for same and kind of lost how to aggregate results in single object. Below is my scatter gather config file:
<scatter-gather input-channel="inputDistribution" output-channel="outputDistribution" gather-channel="gatherChannel" send-timeout="1000"
gather-timeout="1000">
<scatterer apply-sequence="true">
<recipient channel="distribution1Channel"/>
<recipient channel="distribution2Channel"/>
<recipient channel="distribution3Channel"/>
</scatterer>
<gatherer release-strategy="customReleaseStrategy" ref="customReleaseStrategy"/>
</scatter-gather>
<channel id="gatherChannel">
<queue/>
</channel>
<bridge input-channel="distribution1Channel" output-channel="serviceChannel1"/>
<bridge input-channel="distribution2Channel" output-channel="serviceChannel2"/>
<bridge input-channel="distribution3Channel" output-channel="serviceChannel3"/>
<service-activator input-channel="serviceChannel1" output-channel="gatherChannel" ref="firstService" method="process"/>
<service-activator input-channel="serviceChannel2" output-channel="gatherChannel" ref="secondService" method="process"/>
<service-activator input-channel="serviceChannel3" output-channel="gatherChannel" ref="thirdService" method="process"/>
<beans:bean id="firstService" class="com.test.scattergather.service.FirstService"/>
<beans:bean id="secondService" class="com.test.scattergather.service.SecondService"/>
<beans:bean id="thirdService" class="com.test.scattergather.service.ThirdService"/>
<beans:bean id="customReleaseStrategy" class="com.test.scattergather.strategy.CustomReleaseStrategy"/>
Below is my service activator code which will call and endpoint and get a domain object
public class FirstService {
@ServiceActivator
public Dummy process() {
//In actual scenario, call api endpoint and get this object. This object will be populated from API response.
dummy.setID(1);
Dummy dummy = new Dummy();
return dummy;
}}
Similarly secondService and thirdService will populate Dummy object by calling an endpoint.
Below is the code for ReleaseStrategy where i am checking if all the 3 responses has been received
public class CustomReleaseStrategy implements ReleaseStrategy {
@Override
public boolean canRelease(MessageGroup group) {
if(group.getMessages().size() == 3) {
return true;
}
else {
return false;
}
}}
I am using below test to send the message:
this.inputDistribution.send(new GenericMessage<>("foo"),1000);
Message<?> outputMessage = this.outputDistribution.receive(10000);
And getting this error:
15:24:11.579 [task-scheduler-2] DEBUG org.springframework.integration.handler.LoggingHandler - bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:11.584 [task-scheduler-2] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:708)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:204)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91)
at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:164)
at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:81)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:70)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:616)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:587)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:479)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:363)
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:92)
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:90)
at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:94)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:898)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:574)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:541)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 17 more
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:704)
... 41 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:1384)
... 46 more
15:24:11.586 [task-scheduler-2] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 5, missCount = 1]
15:24:12.553 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.554 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 6, missCount = 1]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 7, missCount = 1]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 8, missCount = 1]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 9, missCount = 1]
15:24:12.569 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.570 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 10, missCount = 1]
15:24:12.572 [Test worker] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - After test method: context [DefaultTestContext@305e95a4 testClass = ScatterGatherConfigTest, testInstance = com.test.scattergather.scattergather.config.ScatterGatherConfigTest@67b8d45, testMethod = configDistributionTest@ScatterGatherConfigTest, testException = org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}], mergedContextConfiguration = [MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]], attributes = map['org.springframework.test.context.event.ApplicationEventsTestExecutionListener.recordApplicationEvents' -> false]], class annotated with @DirtiesContext [true] with mode [AFTER_CLASS], method annotated with @DirtiesContext [false] with mode [null].
No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true.
org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}]
at app//org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:146)
at app//org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at app//org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at app//org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at app//org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at app//org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at app//org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at app//com.test.scattergather.scattergather.config.ScatterGatherConfigTest.configDistributionTest(ScatterGatherConfigTest.java:41)
at java.base@11.0.16/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base@11.0.16/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base@11.0.16/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@11.0.16/java.lang.reflect.Method.invoke(Method.java:566)
I want to know how to resolve this error and also how to aggregate the results from all the scatter channel?
Can you show, please, the whole stack trace in your question and formatted properly? For custom aggregation function see ref
attribute of the <gatherer>
.
Although I see you already use it, but you point to a wrong type ReleaseStrategy
.
That ref
have to be an implementation of MessageGroupProcessor
. See some out-of-the-box types. Not sure why a DefaultAggregatingMessageGroupProcessor
which produces a collection of payloads doesn't work for you...