I have a Spring Batch application which is successfully running spring batch jobs but i have exception when declaring multiple DirectChannels.
The exception happens when i start the "firstJob". Here is the exception:
`Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'IFRSGoodBookService-1.secondReplies'.
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 50 common frames omitted`
This is my FlowConfig class which declares the channels:
@Configuration
@RequiredArgsConstructor
class FlowConfig {
private final QueueConfig queueConfig;
@Bean
public DirectChannel firstRequests() {
return new DirectChannel();
}
@Bean
public DirectChannel firstReplies() {
return new DirectChannel();
}
@Bean
public DirectChannel secondRequests() {
return new DirectChannel();
}
@Bean
public DirectChannel secondReplies() {
return new DirectChannel();
}
@Bean("secondManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow secondManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, secondReplies());
}
@Bean("secondWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow secondInBoundFlow() {
return queueConfig.getInboundAdapter(false, secondRequests());
}
@Bean("secondManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow secondManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, secondRequests());
}
@Bean("secondWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow secondWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, secondReplies());
}
@Bean("firstManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow firstManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, firstReplies());
}
@Bean("firstWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerInBoundFlow() {
return queueConfig.getInboundAdapter(false, firstRequests());
}
@Bean("firstManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow firstManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, firstRequests());
}
@Bean("firstWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, firstReplies());
}
}
This is the implementation of the inbound and outbound adapters:
@Configuration
@ConditionalOnProperty("spring.rabbitmq.enabled")
@RequiredArgsConstructor
public class RabbitMqQueueConfig implements QueueConfig {
private final ConnectionFactory connectionFactory;
private final RabbitTemplate defaultRabbitTemplate;
private final QueueConstants queueConstants;
@Override
public IntegrationFlow getInboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE)
: queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE);
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
}
@Override
public IntegrationFlow getOutboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE)
: queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE);
AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
}
}
This is JobManagerConfiguration
@Configuration
@Profile("manager")
@EnableBatchIntegration
@AllArgsConstructor
public class JobManagerPartitionConfiguration {
private final JobRepository jobRepository;
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private PlatformTransactionManager transactionManager;
private DeleteDataTasklet deleteDataTasklet;
private InstDataLoader instDataLoader;
private final ApplicationProperties appProperties;
private final DirectChannel firstRequests;
private final DirectChannel firstReplies;
private final DirectChannel secondRequests;
private final DirectChannel secondReplies;
private final ManagerJobListener managerJobListener;
private final IdBoundaryPartitioner idBoundaryPartitioner;
private final ContextService contextService;
@Bean
public Step firstJobManagerStep() {
return managerStepBuilderFactory.get("firstJobManagerStep")
.partitioner("remotefirstJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(firstRequests)
.inputChannel(firstReplies)
.listener(new SyncStepContextWithJob())
.build();
}
@Bean
public Step secondJobManagerStep() {
return managerStepBuilderFactory.get("secondJobManagerStep")
.partitioner("remoteSecondJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(secondRequests)
.inputChannel(secondReplies)
.listener(new SyncStepContextWithJob())
.build();
}
@Bean
public Job secondJob(Step secondJobManagerStep) {
return new JobBuilder("secondJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(secodJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}
@Bean
public Job firstJob(Step firstJobManagerStep) {
return new JobBuilder("firstJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(firstJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}
This is my JobWorkerConfiguration:
@Configuration
@Profile("worker")
@EnableBatchIntegration
@AllArgsConstructor
@Slf4j
public class JobWorkerPartitionConfiguration {
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
private JobRepository jobRepository;
private JobCache<I9Data[]> cache;
private CacheJobListener<I9Data[]> jobListener;
private WorkerJobListener workerJobListener;
private StepMonitoringListener monitoringListener;
private ApplicationProperties appProperties;
private InstDataLoader instDataLoader;
private ContractLoader contractLoader;
private CacheItemWriter cacheItemWriter;
private PlatformTransactionManager transactionManager;
private InstDataJpaCache instDataJpaCache;
private ContextService contextService;
private MonitorService monitorService;
@Bean
@StepScope
Job remoteFirstJob(NamedParameterJdbcTemplate jdbcTemplate) {
return new JobBuilder("remoteFirstJob", jobRepository).start(instDataLoaderStep())
.next(contractLoaderTaskletStep())
.next(calculateStep())
.listener(contextService)
.listener(jobListener)
.listener(workerJobListener)
.listener(monitoringListener)
// .listener(new SyncStepContextWithJob(this.monitorService))
.build();
}
@Bean
public Step remoteFirstJobAsStep(
DirectChannel firstRequests,
DirectChannel firstReplies
) {
return workerStepBuilderFactory.get("remoteFirstJobAsStep")
.inputChannel(firstRequests)
.outputChannel(firstReplies)
.parametersExtractor(remoteJobParametersExtractor())
.listener(new SyncStepContextWithJob())
.build();
}
The thing is why it's getting such kind of exception when I am starting the first job. It should not care about the secondReplies as my examples in the JobManagerPartitionConfiguration class there is defined for the firstJob inputChannel = "firstReplies" and outputChannel = "firstRequests" so that mean it should use these channels not the second channel configurations.
This is happening because both DirectChannels are looking into the same AMQP queue's.
Solution:
Create new queues for inbound and outbound IntegrationFlow bean's relatively - secondRequests and secondReplies and attach them to the getInboundAdapter()
and getOutboundAdapter()
methods.
Example:
@Override
public IntegrationFlow getInboundAdapter(String queueName, DirectChannel channel) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
}
@Override
public IntegrationFlow getOutboundAdapter(String queueName, DirectChannel channel) {
AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
}
And for FlowConfig:
@Bean("secondWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow secondInBoundFlow() {
return queueConfig.getInboundAdapter("secondRequestQueue", secondRequests());
}
@Bean("secondManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow secondManagerOutboundFlow() {
return queueConfig.getOutboundAdapter("secondRepliesQueue", secondRequests());
}
Do relatively to the other methods.