Search code examples
javaspringspring-batchspring-integration

Starting first job causes - Dispatcher has no subscribers for channel exception when declared multiple DirectChannels


I have a Spring Batch application which is successfully running spring batch jobs but i have exception when declaring multiple DirectChannels.

The exception happens when i start the "firstJob". Here is the exception:

`Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'IFRSGoodBookService-1.secondReplies'.
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
    at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
    ... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 50 common frames omitted`

This is my FlowConfig class which declares the channels:

@Configuration
@RequiredArgsConstructor
class FlowConfig {

    private final QueueConfig queueConfig;

    @Bean
    public DirectChannel firstRequests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel firstReplies() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel secondRequests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel secondReplies() {
        return new DirectChannel();
    }

    @Bean("secondManagerInBoundFlow")
    @Profile("manager")
    public IntegrationFlow secondManagerInBoundFlow() {
        return queueConfig.getInboundAdapter(true, secondReplies());
    }

    @Bean("secondWorkerInBoundFlow")
    @Profile("worker")
    public IntegrationFlow secondInBoundFlow() {
        return queueConfig.getInboundAdapter(false, secondRequests());
    }

    @Bean("secondManagerOutboundFlow")
    @Profile("manager")
    public IntegrationFlow secondManagerOutboundFlow() {
        return queueConfig.getOutboundAdapter(true, secondRequests());
    }

    @Bean("secondWorkerOutboundFlow")
    @Profile("worker")
    public IntegrationFlow secondWorkerOutboundFlow() {
        return queueConfig.getOutboundAdapter(false, secondReplies());
    }
    
    @Bean("firstManagerInBoundFlow")
    @Profile("manager")
    public IntegrationFlow firstManagerInBoundFlow() {
        return queueConfig.getInboundAdapter(true, firstReplies());
    }

    @Bean("firstWorkerInBoundFlow")
    @Profile("worker")
    public IntegrationFlow firstWorkerInBoundFlow() {
        return queueConfig.getInboundAdapter(false, firstRequests());
    }

    @Bean("firstManagerOutboundFlow")
    @Profile("manager")
    public IntegrationFlow firstManagerOutboundFlow() {
        return queueConfig.getOutboundAdapter(true, firstRequests());
    }

    @Bean("firstWorkerOutboundFlow")
    @Profile("worker")
    public IntegrationFlow firstWorkerOutboundFlow() {
        return queueConfig.getOutboundAdapter(false, firstReplies());
    }
}

This is the implementation of the inbound and outbound adapters:

@Configuration
@ConditionalOnProperty("spring.rabbitmq.enabled")
@RequiredArgsConstructor
public class RabbitMqQueueConfig implements QueueConfig {

    private final ConnectionFactory connectionFactory;
    private final RabbitTemplate defaultRabbitTemplate;
    private final QueueConstants queueConstants;

    @Override
    public IntegrationFlow getInboundAdapter(boolean isManager, DirectChannel channel) {
        String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE)
                : queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE);
        return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
    }

    @Override
    public IntegrationFlow getOutboundAdapter(boolean isManager, DirectChannel channel) {
        String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE)
                : queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE);
        AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
        return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
    }
}

This is JobManagerConfiguration

@Configuration
@Profile("manager")
@EnableBatchIntegration
@AllArgsConstructor
public class JobManagerPartitionConfiguration {

    private final JobRepository jobRepository;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private PlatformTransactionManager transactionManager;
    private DeleteDataTasklet deleteDataTasklet;
    private InstDataLoader instDataLoader;
    private final ApplicationProperties appProperties;
    private final DirectChannel firstRequests;
    private final DirectChannel firstReplies;
    private final DirectChannel secondRequests;
    private final DirectChannel secondReplies;
    private final ManagerJobListener managerJobListener;
    private final IdBoundaryPartitioner idBoundaryPartitioner;
    private final ContextService contextService;

    @Bean
    public Step firstJobManagerStep() {
        return managerStepBuilderFactory.get("firstJobManagerStep")
                .partitioner("remotefirstJobAsStep", idBoundaryPartitioner)
                .gridSize(appProperties.getJobParameters().getGridSize())
                .outputChannel(firstRequests)
                .inputChannel(firstReplies)
                .listener(new SyncStepContextWithJob())
                .build();
    }

    @Bean
    public Step secondJobManagerStep() {
        return managerStepBuilderFactory.get("secondJobManagerStep")
                .partitioner("remoteSecondJobAsStep", idBoundaryPartitioner)
                .gridSize(appProperties.getJobParameters().getGridSize())
                .outputChannel(secondRequests)
                .inputChannel(secondReplies)
                .listener(new SyncStepContextWithJob())
                .build();
    }

    @Bean
    public Job secondJob(Step secondJobManagerStep) {
        return new JobBuilder("secondJob", jobRepository).incrementer(new RunIdIncrementer())
                .start(instDataLoaderStep())
                .next(deleteTable())
                .next(secodJobManagerStep)
                .listener(contextService)
                .listener(managerJobListener)
                .build();
    }

    @Bean
    public Job firstJob(Step firstJobManagerStep) {
        return new JobBuilder("firstJob", jobRepository).incrementer(new RunIdIncrementer())
                .start(instDataLoaderStep())
                .next(deleteTable())
                .next(firstJobManagerStep)
                .listener(contextService)
                .listener(managerJobListener)
                .build();
    }

This is my JobWorkerConfiguration:

@Configuration
@Profile("worker")
@EnableBatchIntegration
@AllArgsConstructor
@Slf4j
public class JobWorkerPartitionConfiguration {

    private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
    private JobRepository jobRepository;
    private JobCache<I9Data[]> cache;
    private CacheJobListener<I9Data[]> jobListener;
    private WorkerJobListener workerJobListener;
    private StepMonitoringListener monitoringListener;
    private ApplicationProperties appProperties;
    private InstDataLoader instDataLoader;
    private ContractLoader contractLoader;
    private CacheItemWriter cacheItemWriter;
    private PlatformTransactionManager transactionManager;
    private InstDataJpaCache instDataJpaCache;
    private ContextService contextService;
    private MonitorService monitorService;


    @Bean
    @StepScope
    Job remoteFirstJob(NamedParameterJdbcTemplate jdbcTemplate) {
        return new JobBuilder("remoteFirstJob", jobRepository).start(instDataLoaderStep())
                .next(contractLoaderTaskletStep())
                .next(calculateStep())
                .listener(contextService)
                .listener(jobListener)
                .listener(workerJobListener)
                .listener(monitoringListener)
                // .listener(new SyncStepContextWithJob(this.monitorService))
                .build();
    }

    

    @Bean
    public Step remoteFirstJobAsStep(
            DirectChannel firstRequests,
            DirectChannel firstReplies

    ) {
        return workerStepBuilderFactory.get("remoteFirstJobAsStep")
                .inputChannel(firstRequests)
                .outputChannel(firstReplies)
                .parametersExtractor(remoteJobParametersExtractor())
                .listener(new SyncStepContextWithJob())
                .build();
    }

The thing is why it's getting such kind of exception when I am starting the first job. It should not care about the secondReplies as my examples in the JobManagerPartitionConfiguration class there is defined for the firstJob inputChannel = "firstReplies" and outputChannel = "firstRequests" so that mean it should use these channels not the second channel configurations.


Solution

  • This is happening because both DirectChannels are looking into the same AMQP queue's.

    Solution: Create new queues for inbound and outbound IntegrationFlow bean's relatively - secondRequests and secondReplies and attach them to the getInboundAdapter() and getOutboundAdapter() methods.

    Example:

      @Override
        public IntegrationFlow getInboundAdapter(String queueName, DirectChannel channel) {
            return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
        }
    
        @Override
        public IntegrationFlow getOutboundAdapter(String queueName, DirectChannel channel) {
            AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
            return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
        }
    

    And for FlowConfig:

    @Bean("secondWorkerInBoundFlow")
        @Profile("worker")
        public IntegrationFlow secondInBoundFlow() {
            return queueConfig.getInboundAdapter("secondRequestQueue", secondRequests());
        }
    
        @Bean("secondManagerOutboundFlow")
        @Profile("manager")
        public IntegrationFlow secondManagerOutboundFlow() {
            return queueConfig.getOutboundAdapter("secondRepliesQueue", secondRequests());
        }
    

    Do relatively to the other methods.