Search code examples
javaspringspring-integrationspring-cloud-dataflow

Spring Cloud Dataflow send buffered messages to output channel


I have a filter which buffers rejected messages. I would like to send these buffered messages to the output channel once my predicate is satisfied:

public class MyFilter implements MessageSelector {

    private Buffer buff;

    ...

    @Override
    public boolean accept(final Message<?> message) {
        if (isAcceptable(message))) {
            for (final Message<?> msg : buff.getBuffered()) {
                // accept these as well
            }
            return true;
        } else {
            buff.put(message);
            return false;
        }
    }
}

How can I accept the buffered messages? Is there a better approach?


Solution

  • I would suggest that an aggregator with a custom ReleaseStrategy would be a good approach. When the release strategy releases the group, if the output processor (@Aggregator method when configuring that way) returns a collection of Message<?> they will be released as individual messages.

    When using Java configuration, you might find it simpler to use the Java DSL .aggregate(...) function than creating @Beans.

    There is currently a pull request for an aggregator app starter.