I am trying a demo file transfer program using Spring Boot and Apache Camel file component. I have written a scheduler using Spring Boot that will run every 1 minute which is calling an Apache Camel route and it is doing the file transfer. I have three files in the directory C:\CamelDemo\inputFolder namely input1.txt, input2.txt and input3.txt. My Spring Boot scheduler is as below:
package com.example.demo.scheduler;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class Scheduler {
@Autowired private ProducerTemplate producerTemplate;
@Scheduled(fixedRate=60000)
public void fixedRateSch() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date now = new Date();
String strDate = sdf.format(now);
System.out.println("Fixed Rate scheduler:: " + strDate);
producerTemplate.sendBody("direct:transferFile", null);
System.out.println("Success");
}
}
My Route is as below:
package com.example.demo.route;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class FileTransferRoute extends RouteBuilder {
@SuppressWarnings("deprecation")
@Override
public void configure() {
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.redeliverDelay(1000)
.retryAttemptedLogLevel(LoggingLevel.WARN));
from("direct:transferFile")
.log("Route reached")
.pollEnrich("file:C:\\CamelDemo\\inputFolder?noop=true")
.to("file:C:\\CamelDemo\\outputFolder?autoCreate=false")
.end();
}
}
When I run the program, the scheduler is running 3 times transferring the three files one by one. After that the scheduler is not running anymore. Then when I am trying to stop the Spring Boot application by shutting the embedded Tomcal, it is giving the below errors:
2019-09-11 15:51:14.711 INFO 10408 --- [n(15)-127.0.0.1] inMXBeanRegistrar$SpringApplicationAdmin : Application shutdown requested.
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.spring.SpringCamelContext : Apache Camel 2.24.0 (CamelContext: camel-1) is shutting down
2019-09-11 15:51:14.714 INFO 10408 --- [n(15)-127.0.0.1] o.a.camel.impl.DefaultShutdownStrategy : Starting to graceful shutdown 1 routes (timeout 300 seconds)
2019-09-11 15:51:14.717 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : Waiting as there are still 1 inflight and pending exchanges to complete, timeout in 300 seconds. Inflights per route: [route1 = 1]
2019-09-11 15:51:14.718 INFO 10408 --- [ - ShutdownTask] o.a.camel.impl.DefaultShutdownStrategy : There are 1 inflight exchanges:
InflightExchange: [exchangeId=ID-PCIN467166-1568196927146-0-10, fromRouteId=route1, routeId=route1, nodeId=pollEnrich1, elapsed=0, duration=167106]
So have the below questions: 1. How can I make the scheduler run forever so that it will continuing polling the file location, when a new file comes in the folder it will transfer the file to the output directory. 2. How can I properly shutdown my Spring Boot application hen I want to and why is the error thrown during shutdown? 3. How can I transfer all the three files simultaneously in the first run itself and not one by one?
For question #3 : I think it's not possible, see comments in your other related question
For questions #1 and #2: the key point to understand this behaviors is that you are using the pollEnrich
in receive mode, as you did not set any timeout: see good explanation about timeout in the PollEnrich documentation, specially:
Good practice to use timeout value === By default Camel will use the receive. Which may block until there is a message available. It is therefore recommended to always provide a timeout value, to make this clear that we may wait for a message, until the timeout is hit. ===
To answer your questions: here is what happens with your current route setup:
FileTransferRoute
by sending a dummy message with "null" body to the "direct:transferFile" channel,FileTransferRoute
detect and consumes this message: an Exchange has been created, having the message created at step #1 as input message, pollEnrich
component in FileTransferRoute
blocks until a new file is detected in input directory( as no timeout is defined)
=> this will also block the Spring scheduler because your are using synchronous method producerTemplate.sendBody()
and a direct:
channel (direct
component works with synchronous invocation)
=> this answers your first question: the Spring scheduler is actually still running, but it's thread is blocked in the sendBody()
method invocation, because of pollEnrich
component polling for new incoming files. A next incoming file in the input directory will resume the polling; the scheduled method should then complete, and a new invocation should be triggered 60 seconds later. There is no reason for the scheduler to stop after the first files have been processed.
when you stop the Springboot application, the pollEnrich
component is still blocked, polling for new incoming file: this means there is still a pending Exchange (created at step 2 above) which has not yet been completed
=> this answer your second question: the "inflight exchange" is actually the latest Exchange created by the latest Scheduler invocation. Camel default shutdown strategy will wait X seconds before shutdown, in case of pending (inflight) exchanges.
I recommend you enable DEBUG level for Camel logger, you will understand better what happens under the hood.
You could get rid of the inflight exchange issue and improve your route simply by using a timeout in the pollEnrich component. Since you are implementing the scheduler logic in a dedicated Spring scheduler, I think you should use receiveNoWait
mode (0s timeout).
from("direct:transferFile").autoStartup(true)
.pollEnrich("file:C:\\var\\CamelDemo\\inputFolder?noop=true&delay=2000", 0)
.choice()
.when(exchange -> exchange.getIn().getBody() != null)
.to("file:C:\\var\\CamelDemo\\outputFolder?autoCreate=false")
// handle case where no file is found
.otherwise()
.log(" skip, no file found. ")
.end();
Hope this helps.