Search code examples
spring-integration

Spring Integration DSL - Trigger sub-flow and ignore response


I'm new to Spring Integration and trying to figure this out. I have the following flow:

@Bean
IntegrationFlow incomingFlow(
    @Qualifier("auditTrailFlow") IntegrationFlow auditTrailFlow,
    MyTransform myTransform
) {
  return IntegrationFlow
      .from("channel.incoming")
      .route("headers['vendor']", mapping -> mapping
          .subFlowMapping("vendor1", subFlowMapping -> subFlowMapping
              .publishSubscribeChannel(messageTaskExecutor, pubSubConfig -> pubSubConfig
                  .subscribe(subFlow -> subFlow
                      .enrichHeaders(headerEnricherSpec -> {
                        headerEnricherSpec.header("action", "request");
                        headerEnricherSpec.header("extSys", "vendor1");
                      })
                      .to(auditTrailFlow))
                  .subscribe(subFlow -> subFlow
                      .gateway(searchTypeFlow())
                      .gateway("channel.send.message")
                      .transform(myTransform)
                      .publishSubscribeChannel(messageTaskExecutor, p -> p
                          .subscribe(s -> s
                              .enrichHeaders(headerEnricherSpec -> {
                                headerEnricherSpec.header("action", "response");
                                headerEnricherSpec.header("extSys", "vendor1");
                              })
                              .to(auditTrailFlow))
                          .subscribe(s -> s.<PayoffQuote>handle((payload, headers) -> payload))))))
          .subFlowMapping("vendor2", subFlowMapping -> subFlowMapping.to(otherFlow())))
      .get();
}

The first subscribe under the first publishSubscribeChannel is the flow that I just want to kick off and I don't care what happens in that flow. Is there a way to trigger that flow so its return value is ignored? Seems like I'd have to do it outside of the pubSub flow.

The same flow is used again in the second publishSubscribeChannel and, again, I don't care what happens in that flow. I just want to kick that flow off (can even be async) and continue on with the main flow.

EDIT

Maybe I'm trying to fit a square peg in to a round hole. Let me try to illustrate what I'm trying to do without code:

  • Receive an incoming HTTP request as an ObjectRequest which happens via a @RestController
  • Send message to a gateway in order to trigger a flow based off of the ObjectRequest which should return an ObjectResponse
  • The flow needs to do the following:
  • Save the OjbectRequest to a database (this can happen asyncronously and I don't want the response from this action)
  • Transform ObjectRequest into ObjectMessage in order to send a JMS message and receive the JMS response. The JMS response will produce an ObjectMessageResponse
  • Transform ObjectMessageResponse into the ObjectResponse which the HTTP method is waiting to receive.
  • Also save ObjectResponse to a database (this can happen asyncronously and I don't want the response from this action)

UPDATE WITH FINAL ANSWER ASSISSTED BY ARTEM BILAN

@Bean
IntegrationFlow incomingFlow() {
  return IntegrationFlow
      .from("channel.incoming")
      .wireTap(auditTrailFlow) // this is a separate flow that starts from an executor channel and saves request object to a database.
      .gateway(searchTypeFlow)  // separate flow that transforms the request object into a message object
      .gateway("channel.jms")  // separate flow that sends/receives JMS message and converts it to a message response object.
      .route("headers['vendor']", mapping -> mapping
          .subFlowMapping("vendor1", subFlowMapping -> subFlowMapping
              .transform(myTransform)))))  // transforms the message response object into the response object that the HTTP process is waiting on.
          .subFlowMapping("vendor2", subFlowMapping -> subFlowMapping
              .transform(myTransform)) // transforms the message response object into the response object that the HTTP process is waiting on.
      .bridge()
      .wireTap(auditTrailFlow) // this saves the response object to the database just like above. 
      .get();
}

Solution

  • To send-and-forget from the router you need to use a MessageChannel-based configuration. See channelMapping() instead of subFlowMapping(). The last one is indeed designed for request-reply and return back to the main flow. You need to design your logic the way that you divide it into separate flows. Then in the router you map to their input channels and really forget. Your kickOffAsync() looks like a wireTap() in the IntegrationFlow terms.

    EDIT

    Thank you for sharing your business requirements!

    So, now we are able to understand what is going on and can work out some solution for you.

    So, since you say that DB saving can be as reused component, then let's extract it into its specific business component. And since you say it support to be as an async operation, then we do like this:

    @Bean
    IntegrationFlow saveToDbFlow(Executor executor, DataSource dataSource) {
         return IntegrationFlow.from(MessageChannels.executor(executor).id("toDbChannel"))
           .handle(new JdbcMessageHandler(dataSource, "INSERT INTO..."))
           .get();
    }
    

    Or you can use JPA: it is not clear how you are going to save different object types like the mentioned ObjectRequest and ObjectResponse.

    Since from your description this looks more like an audit, the Wire-Tap pattern is what you need to follow: https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html

    In your main flow you just need to add .wireTap("toDbChannel") - and the message at that flow point is going to be send to that channel and its processing on the subscriber is going to happen asynchronously.

    See also Spring Integration docs for this pattern implementation: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-wiretap

    You latest edit in the question is very close to what I have in mind to show you. Only that the problem you need to replace .channel(auditTrailFlow) with a wireTap(). Sure! If you wish you still can use a flow reference instead, but to make it as an async you need that ExecutorChannel in that flow for auditing.

    Another remark that you need to add a bridge() after router() before wireTap() in the end of your flow. The point is that wireTap() is a ChannelInterceptor which is added to the current channel and its send() operation. In your case that would be an input channel for a route(). But it sounds like you want to intercept really a result of the router and exactly that one to send back to the HTTP request. The bridge() will ensure for a new MessageChannel after the route() and will reply back to the replyChannel in headers.