Search code examples
javaspringspring-integrationspring-integration-dsl

How can I do integration flow to be invoked when I pass File dir to messageChannel and InboundFileAdapter reading files from it?


I have integration flow that reads files from specific dir, transform it to pojo and save in list.

Config class:

@Configuration
@ComponentScan
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {

    @Bean
    public MessageChannel fileChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageSource<File> fileMessageSource(){
        FileReadingMessageSource readingMessageSource = new FileReadingMessageSource();
        CompositeFileListFilter<File> compositeFileListFilter= new CompositeFileListFilter<>();
        compositeFileListFilter.addFilter(new SimplePatternFileListFilter("*.csv"));
        compositeFileListFilter.addFilter(new AcceptOnceFileListFilter<>());
        readingMessageSource.setFilter(compositeFileListFilter);
        readingMessageSource.setDirectory(new File("myFiles"));
        return readingMessageSource;
    }

    @Bean
    public CSVToOrderTransformer csvToOrderTransformer(){
        return new CSVToOrderTransformer();
    }
    @Bean
    public IntegrationFlow convert(){
        return IntegrationFlows.from(fileMessageSource(),source -> source.poller(Pollers.fixedDelay(500)))
                .channel(fileChannel())
                .transform(csvToOrderTransformer())
                .handle("loggerOrderList","processOrders")
                .channel(MessageChannels.queue())
                .get();
    }
}

Transformer:

public class CSVToOrderTransformer {
    @Transformer
    public List<Order> transform(File file){
        List<Order> orders = new ArrayList<>();
        Pattern pattern = Pattern.compile("(?m)^(\\d*);(WAITING_FOR_PAYMENT|PAYMENT_COMPLETED);(\\d*)$");
        Matcher matcher = null;
        try {
            matcher = pattern.matcher(new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        }
        while (!matcher.hitEnd()){
            if(matcher.find()){
                Order order = new Order();
                order.setOrderId(Integer.parseInt(matcher.group(1)));
                order.setOrderState(matcher.group(2).equals("WAITING_FOR_PAYMENT")? OrderState.WAITING_FOR_PAYMENT:OrderState.PAYMENT_COMPLETED);
                order.setOrderCost(Integer.parseInt(matcher.group(3)));
                orders.add(order);
            }
        }
        return orders;
    }
}

OrderState enum :

public enum OrderState {
    CANCELED,
    WAITING_FOR_PAYMENT,
    PAYMENT_COMPLETED
}

Order :

public class Order {
    private int orderId;
    private OrderState orderState;
    private int orderCost;
}

LoggerOrderList service:

@Service
public class LoggerOrderList {
    private static final Logger LOGGER = LogManager.getLogger(LoggerOrderList.class);
    public List<Order> processOrders(List<Order> orderList){
        orderList.forEach(LOGGER::info);
        return orderList;
    }
}

1)How can I do that flow starts when I pass invoke gateway method? 2)How can I read passed message in inbound-channel-adapter(in my case is FileReadingMessageSource)?


Solution

  • The FileReadingMessageSource is based on the polling the provided in the configuration directory. This is the beginning of the flow and it cannot be used in the middle of some logic.

    You didn’t explain what is your gateway is, but probably you would like to have similar logic to get content if the dir passed as a payload of sent message. However such a logic doesn’t look like a fit for that message source anyway. It’s goal is to poll the dir all the time for new content. If you want something similar for several dirs, you may consider to have dynamically registered flows for provided dirs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-runtime-flows.

    Otherwise you need to consider to have a plain services activator which would call listFiles() on the provided dir. just because without “wait for new content “ feature it does not make sense to abuse FileReeadingMessageSource