Search code examples
javaspring-integrationdslcancellation

How to Interrupt or Cancel a Spring Integration Java DSL flow?


Contemplating implementation of a feature wherein a JdbcRepositoryHandler (implemnting MessageHandler) might listen for an outside event (e.g., CancelRunEvent).

I think I'd use Spring' ApplicationEvent support to publish an event via a REST controller endpoint. And I would guess I'd have the aforementioned handler implement ApplicationListener to listen for a particular event?

The question is: if the handler is saturated with messages it needs to process, how would I signal termination of all subsequent messages that may have emanated upstream e.g., from a FileSplitter?

While I could easily construct a condition to be checked before calling a method responsible e.g., for a persistence operation (based on some state received from the CancelRunEvent), how could I interrupt the flow entirely?

For illustrative purposes, imagine a flow like:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
        .channel(runStatsChannel())
        .transform(new FileToInputStreamTransformer())
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

@Bean
protected IntegrationFlow persistenceSubFlow() {
    // @formatter:off
    return f -> f
            // @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
            .aggregate(a -> a
                    .correlationStrategy(new HeaderAttributeCorrelationStrategy(RunStats.FILE_TOKEN))
                    .releaseStrategy(new MessageCountReleaseStrategy(persistenceBatchSize))
                    .sendPartialResultOnExpiry(true)
                    .expireGroupsUponCompletion(true)
                    .groupTimeoutExpression(persistenceBatchReleaseTimeoutMillis)
            )
            .handle(new JdbcRepositoryHandler(typeSupport, metricRegistry, runStatsRepository));
    // @formatter:on
}

Solution

  • It's not entirely clear what you mean or why you would need the JdbcRepositoryHandler to manage this rather than some other ApplicationListener.

    Your flow is running on some thread upstream of s3Channel(). Depending on what that is, you could stop() the message source and no new messages will be emitted after the current one (or ones if it's multi-threaded).

    However, you may (likely will) end up with a partial aggregation sitting in memory until the group times out.