Search code examples
java-8spring-integrationspring-integration-dslspring-cloud-gcp

Unable to attach subscriber to BroadCastingDispatcher in Spring Integration


Following this example at Bootiful GCP: Integration with Google Cloud Pub/Sub (4/8), I was trying to build a flow where I read from Google Pubsub subscription and write to another topic.

After starting my application on DEBUG mode, I can see that the messages are arriving from Google PubSub but they are not getting "consumed" because of this

o.s.i.dispatcher.BroadcastingDispatcher : No subscribers, default behavior is ignore

Any help on this is much appreciated.

Following is how my major code looks like-

public class PubsubRouteBuilderService {

    private final PubSubTemplate pubSubTemplate; // injected via Spring

    public PubsubRouteBuilderService(PubSubTemplate pubSubTemplate) {
        this.pubSubTemplate = pubSubTemplate;
    }

    public synchronized boolean buildRoute(PubsubRouteModel pubsubRouteModel) {
        log.info("Building route for: {}", pubsubRouteModel);
        buildPubsubRoute(pubsubRouteModel);
        // some unrelated logic
        return true;
    }

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {

        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();

        standardIntegrationFlow.start();
    }
}

Here are the other methods from RouteBuilderFactory as follows-

public static MessageChannel getMessageChannel() {
    return MessageChannels.publishSubscribe().get();
}

public static PubSubInboundChannelAdapter messageChannelAdapter(MessageChannel inputChannel, PubSubTemplate pubSubTemplate, String channelName) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, channelName);
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);
    return adapter;
}

Solution

  • Your code doesn't seem to be based on that blog post at all...

    private void buildPubsubRoute(PubsubRouteModel pubsubRouteModel) {
    
        final StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(
            RouteBuilderFactory
                    .messageChannelAdapter(
                            RouteBuilderFactory.getMessageChannel(),
                            pubSubTemplate,
                            pubsubRouteModel.getFromSub()))
            .handle(
                    message -> {
                        log.info("consumed new message: [" + message.getPayload() + "]");
                        AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ORIGINAL_MESSAGE, AckReplyConsumer.class);
                        consumer.ack();
                    })
            .get();
    
        standardIntegrationFlow.start();
    }
    

    You can't just "start" some arbitrary IntegrationFlow object - it must be managed by Spring (declared as a @Bean).

    There's some infrastructure that the framework builds behind the scenes to make all this work.