Search code examples
javaspringspring-batchpolling

Spring batch integration


I am looking for a guidance/solution with Spring batch integration. I have a directory to which external application will send xml files. My application should read the file content and move the file to another directory.

The application should be able to process the files in parallel.

Thanks in advance.


Solution

  • You can use Spring Integration ftp / sftp combined with Spring Batch:

    1.Spring Integration Ftp Configuration :

    <bean id="ftpClientFactory"
        class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
        <property name="host" value="${host.name}" />
        <property name="port" value="${host.port}" />
        <property name="username" value="${host.username}" />
        <property name="password" value="${host.password}" />
        <property name="bufferSize" value="100000"/>
    </bean>
    <int:channel id="ftpChannel" />
    <int-ftp:outbound-channel-adapter id="ftpOutbound"
        channel="ftpChannel" remote-directory="/yourremotedirectory/" session-factory="ftpClientFactory" use-temporary-file-name="false" />
    

    2.Create your reader and autowire a service to provide your items if needed :

     @Scope("step")
     public class MajorItemReader implements InitializingBean{
    
            private List<YourItem> yourItems= null;
    
            @Autowired
            private MyService provider;
    
    
            public YourItem read() {
                if ((yourItems!= null) && (yourItems.size() != 0)) {
                    return yourItems.remove(0);
                }
                return null;
            }
    
            //Reading Items from Service
            private void reloadItems() {
    
            this.yourItems= new ArrayList<YourItem>();
            // use the service to provide your Items
           if (yourItems.isEmpty()) {
                    yourItems= null;
                }
            }
            public MyService getProvider() {
                return provider;
            }
            public void setProvider(MyService provider) {
                this.provider = provider;
            }
            @Override
            public void afterPropertiesSet() throws Exception {
                reloadItems();
            }
    }
    

    3. Create Your Own Item Processor

         public class MyProcessor implements
        ItemProcessor<YourItem, YourItem> {
        @Override
        public YourItem process(YourItem arg0) throws Exception {
        // Apply any logic to your Item before transferring it to the writer
        return arg0;
        }
        }
    

    4. Create Your Own Writer :

       public class MyWriter{
       @Autowired
       @Qualifier("ftpChannel")
       private MessageChannel messageChannel;
       public void write(YourItem pack) throws IOException {
       //create your file and from your Item 
       File file = new File("the_created_file");
       // Sending the file via Spring Integration Ftp Channel
       Message<File> message = MessageBuilder.withPayload(file).build();
       messageChannel.send(message);
       }
    

    5.Batch Configuration :

    <bean id="dataSourcee"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="" />
        <property name="url" value="" />
        <property name="username" value="" />
        <property name="password" value="" />
    </bean>
    <bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <property name="dataSource" ref="dataSourcee" />
        <property name="transactionManager" ref="transactionManagerrr" />
        <property name="databaseType" value="" />
    </bean>
    <bean id="transactionManagerrr"
        class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
    

    6.Another ApplicationContext file to Configure Your Job :

    <context:annotation-config />
    <bean id="provider" class="mypackage.MyService" />
    <context:component-scan base-package="mypackage" />
    <bean id="myReader" class="mypackage.MyReader"
        <property name="provider" ref="provider" />
     </bean>
    <bean id="myWriter" class="mypackage.MyWriter" />
    <bean id="myProcessor" class="mypackage.MyProcessor" />
       <bean id="mReader"
        class="org.springframework.batch.item.adapter.ItemReaderAdapter">
        <property name="targetObject" ref="myReader" />
        <property name="targetMethod" value="read" />
    </bean>
    <bean id="mProcessor"
        class="org.springframework.batch.item.adapter.ItemProcessorAdapter">
        <property name="targetObject" ref="myProcessor" />
        <property name="targetMethod" value="process" />
    </bean>
    <bean id="mWriter"
        class="org.springframework.batch.item.adapter.ItemWriterAdapter">
        <property name="targetObject" ref="myWriter" />
        <property name="targetMethod" value="write" />
    </bean>
    <batch:job id="myJob">
        <batch:step id="step01">
            <batch:tasklet>
                <batch:chunk reader="mReader" writer="mWriter"
                    processor="mProcessor" commit-interval="1">
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>
    <bean id="myRunScheduler" class="mypackage.MyJobLauncher" />
    <task:scheduled-tasks>
        <task:scheduled ref="myJobLauncher" method="run"
            cron="0 0/5 * * * ?" />
        <!-- this will maker the job runs every 5 minutes -->
    </task:scheduled-tasks>
    

    7.Finally Configure A launcher to launch your job :

    public class MyJobLauncher {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    @Qualifier("myJob")
    private Job job;
    public void run() {
        try {
            String dateParam = new Date().toString();
            JobParameters param = new JobParametersBuilder().addString("date",
                    dateParam).toJobParameters();
            JobExecution execution = jobLauncher.run(job, param);
            execution.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }