Search code examples
javaspring-integrationspring-integration-dsl

Free queue channel in spring integration with java DSL


i have a channel that stores messages. When new messages arrive, if the server has not yet processed all the messages (that still in the queue), i need to clear the queue (for example, by rerouting all data into another channel). For this, I used a router. But the problem is when a new messages arrives, then not only old but also new ones rerouting into another channel. New messages must remain in the queue. How can I solve this problem? This is my code:

    @Bean
    public IntegrationFlow integerFlow() {
        return IntegrationFlows.from("input")
                .bridge(e -> e.poller(Pollers.fixedDelay(500, TimeUnit.MILLISECONDS, 1000).maxMessagesPerPoll(1)))
                .route(r -> {
                    if (flag) {
                        return "mainChannel";
                    } else {
                        return "garbageChannel";
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow outFlow() {
        return IntegrationFlows.from("mainChannel")
                .handle(m -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(m.getPayload() + "\tmainFlow");
                })
                .get();
    }

    @Bean
    public IntegrationFlow outGarbage() {
        return IntegrationFlows.from("garbageChannel")
                .handle(m -> System.out.println(m.getPayload() + "\tgarbage"))
                .get();
    }

Flag value changes through @GateWay by pressing "q" and "e" keys.


Solution

  • I would suggest you to take a look into a purge API of the QueueChannel:

    /**
     * Remove any {@link Message Messages} that are not accepted by the provided selector.
     * @param selector The message selector.
     * @return The list of messages that were purged.
     */
    List<Message<?>> purge(@Nullable MessageSelector selector);
    

    This way with a custom MessageSelector you will be able to remove from the queue old messages. See a timestamp message header to consult. With the result of this method you can do whatever you need to do with old messages.