Following is an Spring IntegrationFlow i am trying to make work using Spring integration 6 and Google PubSub 1.123. However, AUTO acknowledgment is not working reason being the routeToRecpient is one way message handle. Is routeToRecipents one-way MessageHandle or am i doing something worng. is there a way to pass on the message to main flow ?
public IntegrationFlow processEvent() {
return IntegrationFlow.from(Function.class, gateway -> gateway.beanName("onMessage"))
.transform(Transformers.fromJson(Alert.class))
.enrichHeaders(h -> h
.headerFunction(POSTGRES, t -> POSTGRES.equalsIgnoreCase(store) || ALL.equalsIgnoreCase(store))
.headerFunction(BIGTABLE, t -> BIGTABLE.equalsIgnoreCase(store) || ALL.equalsIgnoreCase(store))
.headerFunction(NOSUPPORT,
t -> !BIGTABLE.equalsIgnoreCase(store) && !ALL.equalsIgnoreCase(store)
&& !POSTGRES.equalsIgnoreCase(store)))
.log(LoggingHandler.Level.DEBUG, "Message Routed to DB store", t -> t.toString())
.routeToRecipients(r -> r
.recipientMessageSelectorFlow(m -> m.getHeaders().get(BIGTABLE, Boolean.class),
c -> c.channel(postgresRouteChannel()))
.recipientMessageSelectorFlow(m -> m.getHeaders().get(BIGTABLE, Boolean.class),
c -> c.channel(bigtableRouteChannel()))
.recipientMessageSelectorFlow(m -> m.getHeaders().get(NOSUPPORT, Boolean.class),
c -> c.channel(noSupportRouteChannel())))
.get();
Need a way to handle the acknowledgement to PubSub messages
Why is that a Function
? What do you produce from your flow? It really looks like your logic is one way and distribution. Isn't Consumer
works for your instead?
That's probably indeed a reason why auto-ack is not happened: the Function
waits for a reply, but you don't produce one. We may do that as well, but need to understand what you are going to produce as a reply.