Contemplating implementation of a feature wherein a JdbcRepositoryHandler
(implemnting MessageHandler
) might listen for an outside event (e.g., CancelRunEvent
).
I think I'd use Spring' ApplicationEvent
support to publish an event via a REST controller endpoint. And I would guess I'd have the aforementioned handler implement ApplicationListener
to listen for a particular event?
The question is: if the handler is saturated with messages it needs to process, how would I signal termination of all subsequent messages that may have emanated upstream e.g., from a FileSplitter
?
While I could easily construct a condition to be checked before calling a method responsible e.g., for a persistence operation (based on some state received from the CancelRunEvent
), how could I interrupt the flow entirely?
For illustrative purposes, imagine a flow like:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
protected IntegrationFlow persistenceSubFlow() {
// @formatter:off
return f -> f
// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a -> a
.correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
.releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true)
.groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
)
.handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
// @formatter:on
}
It's not entirely clear what you mean or why you would need the JdbcRepositoryHandler
to manage this rather than some other ApplicationListener
.
Your flow is running on some thread upstream of s3Channel()
. Depending on what that is, you could stop()
the message source and no new messages will be emitted after the current one (or ones if it's multi-threaded).
However, you may (likely will) end up with a partial aggregation sitting in memory until the group times out.