Search code examples
spring-integrationgoogle-cloud-pubsub

Spring Integration, Google Pubsub Auto ack not working


Following is an Spring IntegrationFlow i am trying to make work using Spring integration 6 and Google PubSub 1.123. However, AUTO acknowledgment is not working reason being the routeToRecpient is one way message handle. Is routeToRecipents one-way MessageHandle or am i doing something worng. is there a way to pass on the message to main flow ?

public IntegrationFlow processEvent() {
        return IntegrationFlow.from(Function.class, gateway -> gateway.beanName("onMessage"))
                .transform(Transformers.fromJson(Alert.class))
                .enrichHeaders(h -> h
                        .headerFunction(POSTGRES, t -> POSTGRES.equalsIgnoreCase(store) || ALL.equalsIgnoreCase(store))
                        .headerFunction(BIGTABLE, t -> BIGTABLE.equalsIgnoreCase(store) || ALL.equalsIgnoreCase(store))
                        .headerFunction(NOSUPPORT,
                                t -> !BIGTABLE.equalsIgnoreCase(store) && !ALL.equalsIgnoreCase(store)
                                        && !POSTGRES.equalsIgnoreCase(store)))
                .log(LoggingHandler.Level.DEBUG, "Message Routed to DB store", t -> t.toString())
                
                .routeToRecipients(r -> r
                        .recipientMessageSelectorFlow(m -> m.getHeaders().get(BIGTABLE, Boolean.class),
                                c -> c.channel(postgresRouteChannel()))
                        .recipientMessageSelectorFlow(m -> m.getHeaders().get(BIGTABLE, Boolean.class),
                                c -> c.channel(bigtableRouteChannel()))
                        .recipientMessageSelectorFlow(m -> m.getHeaders().get(NOSUPPORT, Boolean.class),
                                c -> c.channel(noSupportRouteChannel())))

                .get();

Need a way to handle the acknowledgement to PubSub messages


Solution

  • Why is that a Function? What do you produce from your flow? It really looks like your logic is one way and distribution. Isn't Consumer works for your instead?

    That's probably indeed a reason why auto-ack is not happened: the Function waits for a reply, but you don't produce one. We may do that as well, but need to understand what you are going to produce as a reply.