Search code examples
springspring-bootrabbitmqspring-batchspring-integration

Do not run the IntegrationFlow when spring batch remote chunking worker application starts


i am new to spring integrarion and batch and i want to develop a remote chunking batch application with a master and worker . I used spring integration and RabbitMQ for message queue, the application runs fine but the worker itemProccessor start automaticaly, however i need to controle when to start it.

@Configuration
@EnableBatchProcessing
@EnableBatchIntegration
@EnableIntegration
public class WorkerConfig {


    @Autowired
    private RemoteChunkingWorkerBuilder<Integer, Integer> remoteChunkingWorkerBuilder;

    @Bean
    public DirectChannel requestsChannel() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests"))
                .channel(requestsChannel())
                .get();
    }

    @Bean
    public DirectChannel repliesChannel() {
        System.out.println("repliesChannel 3 ");
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow outboundFlow(AmqpTemplate amqpTemplate) {
        return IntegrationFlows
                .from(repliesChannel())
                .handle(Amqp.outboundAdapter(amqpTemplate).routingKey("replies"))
                .get();
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
       ....
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
       ...
    }

    @Bean
    public IntegrationFlow workerIntegrationFlow() {
        return this.remoteChunkingWorkerBuilder
                .itemProcessor(itemProcessor())
                .itemWriter(itemWriter())
                .inputChannel(requestsChannel())
                .outputChannel(repliesChannel())
                .build();
    }


}

So what i can do to manualy start the worker part ?


Solution

  • Give the adapter an id and set auto startup to false.

    @Bean
    public IntegrationFlow inboundFlow(ConnectionFactory connectionFactory) {
        return IntegrationFlows
                .from(Amqp.inboundAdapter(connectionFactory,"requests")
                    .id("inbound")
                    .autoStartup(false))
                .channel(requestsChannel())
                .get();
    }
    

    Then @Autowire the adapter and start it...

    @Autowired
    AmqpInboundChannelAdapter inbound;
    
    ...
        inbound.start();