Using Spring Integration Java DSL, I have constructed a flow where I'm processing files synchronously with a FileSplitter
. I've been able to use the setDeleteFiles
flag on a AbstractFilePayloadTransformer
to delete the file after converting each line in File
to a Message
for subsequent processing, like so:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
This works fine, but is slow. So I attempt to add an ExecutorChannel
after the .split
above, like so:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
But then the aforementioned delete flag does not allow the flow to complete successfully deleting file(s) before they are completely read.
If I remove the flag I have the potential to exhaust the local file system where files were synchronized from S3.
What could I introduce above to a) process each file completely and b) delete file from local filesystem once done? In other words, is there a way to get to know exactly when a file is completely processed (when it's lines have been processed asynchronously via threads in a pool)?
If you're curious here's my impl of FileToInputStreamTransformer
:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
UPDATE
So how does something in downstream flow know what to ask for?
Incidentally, if I'm following your advice correctly, when I update the .split
with new FileSplitter(true, true)
above, I get
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
Thanks Artem.
I did manage to address the issue, but perhaps in a more heavy-weight manner.
Introducing an ExecutorChannel
caused quite the ripple of implementation adjustments, including: turning off the setDeleteFiles
flag on the AbtractFilePayloadTransformer
, updating a JPA @Entity
, RunStats
and repository for such, to capture file processing status as well as processing status for an entire run. Taken together the processing status updates lets the flow know when to delete files from local filesystem (i.e., when they're fully processed) and to return a status in a /stats/{run}
endpoint so a user can know when a run is completed.
Here are snippets from my implementation (if anyone's curious)...
class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
public class RunStatsHandler extends AbstractMessageHandler {
private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
private final RunStatsRepository runStatsRepository;
public RunStatsHandler(RunStatsRepository runStatsRepository) {
this.runStatsRepository = runStatsRepository;
}
// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
if (runStats != null) {
File compressedFile = (File) message.getPayload();
String compressedFileName = compressedFile.getCanonicalPath();
LongAdder lineCount = new LongAdder();
// Streams and Scanner implement java.lang.AutoCloseable
InputStream fs = new FileInputStream(compressedFile);
InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
while (sc.hasNextLine()) {
sc.nextLine();
lineCount.increment();
}
// note that Scanner suppresses exceptions
if (sc.ioException() != null) {
log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName,
"exception", sc.ioException().getMessage()));
throw sc.ioException();
}
runStats.addFile(compressedFileName, token, lineCount.longValue());
runStatsRepository.updateRunStats(runStats);
log.info("file.lineCount",
ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
}
}
}
}
Updated flow
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public MessageChannel runStatsChannel() {
DirectChannel wiretapChannel = new DirectChannel();
wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
DirectChannel loggingChannel = new DirectChannel();
loggingChannel.addInterceptor(new WireTap(wiretapChannel));
return loggingChannel;
}
Unfortunately, I can't share the RunStats
and repo implementations.