Search code examples
springspring-integrationspring-integration-dsl

Retrieve all files that match a filter once


I'm trying to get the file count with my filter from my streaming inbound ftp apdater, so after i process all files, I want to launch a remote shell, or is there any other way to know that the adapter finished sending messages?

I tried already with CompositeFileListFilter overriding the public List filterFiles(F[] files) method, but it never gets called.

for now I'm using a fixed file count, but it should be dynamic.

I made an override of this method on the CompositeFileListFilter

@Override
    public List<F> filterFiles(F[] files) {
        log.info("received {} files", files.length);
        return super.filterFiles(files);
    }

I have the following integration flow, using an atomic counter until 3, it should be 3.:

AtomicInteger messageCounter = new AtomicInteger(0);
        return IntegrationFlows.from(Ftp.inboundStreamingAdapter(goldv5template())
                .remoteDirectory("/inputFolder")
                .filter(new CompositeFileListFilterWithCount<>() {{
                    addFilter(new FtpSimplePatternFileListFilter("pattern1.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern2.*"));
                    addFilter(new FtpSimplePatternFileListFilter("pattern3.*"));
                }})
            , pollerConfiguration)
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE))
            .handle(message -> {
                int numericValue = messageCounter.incrementAndGet();
                log.info("numeric value: {}", numericValue);
                if (numericValue == 3) {
                    messageCounter.set(0);
                    log.info("launch remote shell here now"));                 
                }
            }, e -> e.advice(after()))
            .get();

if I don't use the counter, I would get a remote shell call for every file and I only need it to be called once, only when the flow finished, it's scheduled based on a cronjob, so I want to call it only one time at the end.

I'm using 1s fixed delay for test, but it would only run three times a day, I have to fetch all times at every clock.

this is my pollerConfiguration for test:

sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerFactory -> pollerFactory.fixedRate(1000L))

UPDATE

I tried what was suggested by Artem, but I'm having a weird behavior, I'm trying to fetch all files in a certain ftp folder in one poll, so reading the docs:

if the max-messages-per-poll is set to 1 (the default), it processes only one file at a time with intervals as defined by your trigger, essentially working as “one-poll === one-file”.

For typical file-transfer use cases, you most likely want the opposite behavior: to process all the files you can for each poll and only then wait for the next poll. If that is the case, set max-messages-per-poll to -1. Then, on each poll, the adapter tries to generate as many messages as it possibly can...

so i have set max-message-per-poll to -1 so every poll gives me every file. I added a Filter to only take .xml files and to prevent duplicates, an acceptOnceFilter, but the ftp streaming adapter is giving me unlimited times the same files which doesn't make sense, I used for this test a FixedDelay of 10s.

2019-07-23 10:32:04.308  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process2 file sample1.xml
2019-07-23 10:32:04.312  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.313  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.315  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.324  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.326  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.330  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.331  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.333  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
2019-07-23 10:32:04.337  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample4.xml
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.338  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.341  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample1.xml
2019-07-23 10:32:04.345  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample1.xml
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.346  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.347  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample2.xml
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.351  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.353  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] o.s.integration.ftp.session.FtpSession   : File has been successfully transferred to: /output/sample3.xml
2019-07-23 10:32:04.356  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : Advice after handle.
2019-07-23 10:32:04.357  INFO 9008 --- [   scheduling-1] i.d.e.v.job.factory.TestFlowFactory      : ________________________________
2019-07-23 10:32:04.358  INFO 9008 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : process file sample4.xml
...............................
return IntegrationFlows
            .from(Ftp.inboundStreamingAdapter(testFlowTemplate())
                    .remoteDirectory("/inputTestFlow")
                    .filter(new CompositeFileListFilter<>() {{
                        addFilter(new AcceptOnceFileListFilter<>());
                        addFilter(new FtpSimplePatternFileListFilter("*.xml"));
                    }})
                , sourcePollingChannelAdapterSpec -> sourcePollingChannelAdapterSpec.poller(pollerConfiguration.maxMessagesPerPoll(-1)))
            .transform(Transformers.fromStream(StandardCharsets.UTF_8.toString()))
            .log(message -> {
                execution.setStartDate(new Date());
                return "process file " + message.getHeaders().get(FileHeaders.REMOTE_FILE);
            })
            .handle(Ftp.outboundAdapter(FTPServers.PC_LOCAL.getFactory(), FileExistsMode.REPLACE)
                    .useTemporaryFileName(false)
                    .fileNameExpression("headers['" + FileHeaders.REMOTE_FILE + "']")
                    .remoteDirectory("/output/")
                , e -> e.advice(testFlowAfter())
            )
            .get();

Update 2

I achieved what I needed creating this custom filter:

.filter(new FileListFilter<>() {
                        private final Set<String> seenSet = new HashSet<>();
                        private Date lastExecution;

                        @Override
                        public List<FTPFile> filterFiles(FTPFile[] files) {
                            return Arrays.stream(files).filter(ftpFile -> {
                                if (lastExecution!= null && TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - lastExecution.getTime()) >= 10L) {
                                    this.seenSet.clear();
                                }
                                lastExecution = new Date();
                                if (ftpFile.getName().endsWith(".xml")) {
                                    return this.seenSet.add(ftpFile.getRawListing());
                                }
                                return false;
                            }).collect(Collectors.toList());
                        }
                    })

but I used a handmade 10 Seconds Interval which is okay for my need, any other smart way to make this code better depending on the trigger ?


Solution

  • I think cron trigger is not a right solution here since you really would like to have a single process for all the fetched files.

    I think your logic in the filterFiles() is wrong. You really would like to set a counter to the number of files it is going to process, but not the original amount:

    @Override
    public List<F> filterFiles(F[] files) {
        List<F> filteredFiles = super.filterFiles(files);
        log.info("received {} files", filteredFiles.size());
        return filteredFiles;
    }
    

    and here you indeed can set a value into that messageCounter.

    UPDATE

    There is this functionality on filter:

    /**
     * Indicates that this filter supports filtering a single file.
     * Filters that return true <b>must</b> override {@link #accept(Object)}.
     * Default false.
     * @return true to allow external calls to {@link #accept(Object)}.
     * @since 5.2
     * @see #accept(Object)
     */
    default boolean supportsSingleFileFiltering() {
        return false;
    }
    

    I think when you override it to an explicit false in your CompositeFileListFilterWithCount, you should be good. Otherwise you are indeed right: only a plain accept() is called for each file by default. Just because all your FtpSimplePatternFileListFilter comes with true by default and all of them are contribution to true on the FtpSimplePatternFileListFilter level.

    Nevertheless all of that says to us that you are using already Spring Integration 5.2 :-)...

    UPDATE 2

    Try ChainFileListFilter isntead. Place an AcceptOnceFileListFilter in the end of the chain. Although it might be better to use a FtpPersistentAcceptOnceFileListFilter instead: it takes into account a lastmodified for the file. Also consider to include into chain some LastModifiedFileListFilter variant for the FTPFile. Something similar you have in your custom one, but as a separate filter.

    Not sure, though, what you mean about making it based on trigger. There is just no any relationship between filter and trigger. You may, of course, have some common interval property and adjust it into the last modified filter value.

    By the way: this your story went far away from the original at once request. An Inbound Channel Adapter is really about one file per message, so you definitely can't have a list of file in one message, like it is possible with the FtpOutboundGateway and its LS or MGET commands as I mentioned in comments below.