I have a filter which buffers rejected messages. I would like to send these buffered messages to the output channel once my predicate is satisfied:
public class MyFilter implements MessageSelector {
private Buffer buff;
...
@Override
public boolean accept(final Message<?> message) {
if (isAcceptable(message))) {
for (final Message<?> msg : buff.getBuffered()) {
// accept these as well
}
return true;
} else {
buff.put(message);
return false;
}
}
}
How can I accept the buffered messages? Is there a better approach?
I would suggest that an aggregator with a custom ReleaseStrategy
would be a good approach. When the release strategy releases the group, if the output processor (@Aggregator
method when configuring that way) returns a collection of Message<?>
they will be released as individual messages.
When using Java configuration, you might find it simpler to use the Java DSL .aggregate(...)
function than creating @Bean
s.
There is currently a pull request for an aggregator app starter.