Search code examples
javafilespring-integrationdslsplitter

Spring Integration Java DSL flow Splitter/Aggregator delete file after processing all lines


Using Spring Integration Java DSL, I have constructed a flow where I'm processing files synchronously with a FileSplitter. I've been able to use the setDeleteFiles flag on a AbstractFilePayloadTransformer to delete the file after converting each line in File to a Message for subsequent processing, like so:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // do not exhaust filesystem w/ files downloaded from S3
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .channel(StatsUtil.createRunStatsChannel(runStatsRepository))
        .transform(transformer)
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

This works fine, but is slow. So I attempt to add an ExecutorChannel after the .split above, like so:

.channel(c -> c.executor(Executors.newFixedThreadPool(10)))

But then the aforementioned delete flag does not allow the flow to complete successfully deleting file(s) before they are completely read.

If I remove the flag I have the potential to exhaust the local file system where files were synchronized from S3.

What could I introduce above to a) process each file completely and b) delete file from local filesystem once done? In other words, is there a way to get to know exactly when a file is completely processed (when it's lines have been processed asynchronously via threads in a pool)?

If you're curious here's my impl of FileToInputStreamTransformer:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

    @Override
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
    }
}

UPDATE

So how does something in downstream flow know what to ask for?

Incidentally, if I'm following your advice correctly, when I update the .split with new FileSplitter(true, true) above, I get

2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)

Solution

  • Thanks Artem.

    I did manage to address the issue, but perhaps in a more heavy-weight manner.

    Introducing an ExecutorChannel caused quite the ripple of implementation adjustments, including: turning off the setDeleteFiles flag on the AbtractFilePayloadTransformer, updating a JPA @Entity, RunStats and repository for such, to capture file processing status as well as processing status for an entire run. Taken together the processing status updates lets the flow know when to delete files from local filesystem (i.e., when they're fully processed) and to return a status in a /stats/{run} endpoint so a user can know when a run is completed.

    Here are snippets from my implementation (if anyone's curious)...

    class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
    
    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
    
    @Override
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
    }
    }
    
    public class RunStatsHandler extends AbstractMessageHandler {
    
    private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
    
    private final RunStatsRepository runStatsRepository;
    
    public RunStatsHandler(RunStatsRepository runStatsRepository) {
        this.runStatsRepository = runStatsRepository;
    }
    
    // Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
    @Override
    protected void handleMessageInternal(Message<?> message) throws Exception {
        RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
        String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
        if (runStats != null) {
            File compressedFile = (File) message.getPayload();
            String compressedFileName = compressedFile.getCanonicalPath();
            LongAdder lineCount = new LongAdder();
            // Streams and Scanner implement java.lang.AutoCloseable
            InputStream fs = new FileInputStream(compressedFile);
            InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
            try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
                while (sc.hasNextLine()) {
                    sc.nextLine();
                    lineCount.increment();
                }
                // note that Scanner suppresses exceptions
                if (sc.ioException() != null) {
                    log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
                            "exception", sc.ioException().getMessage()));
                    throw sc.ioException();
                }
                runStats.addFile(compressedFileName, token, lineCount.longValue());
                runStatsRepository.updateRunStats(runStats);
                log.info("file.lineCount",
                        ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
            }
        }
    }
    
    }
    

    Updated flow

    @Bean
    protected IntegrationFlow s3ChannelFlow() {
        // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
        // @formatter:off
        return IntegrationFlows
            .from(s3Channel())
            .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
            .channel(runStatsChannel())
            .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
            .transform(new FileToInputStreamTransformer())
            .split(new FileSplitter())
            .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
            .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
            .get();
        // @formatter:on
    }
    
    @Bean
    public MessageChannel runStatsChannel() {
        DirectChannel wiretapChannel = new DirectChannel();
        wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
        DirectChannel loggingChannel = new DirectChannel();
        loggingChannel.addInterceptor(new WireTap(wiretapChannel));
        return loggingChannel;
    }
    

    Unfortunately, I can't share the RunStats and repo implementations.