Search code examples
springspring-integrationspring-cloudspring-cloud-streamspring-cloud-function

consumption of events stopped after the consumer throw an exception in spring cloud stream?


I have an aggregation function that aggregates events published into output-channel. I have subscribed to the flux generated by the function like below:

@Component
public class EventsAggregator {
    @Autowired
    private Sinks.Many<Message<?>> eventsPublisher; // Used to publish events from different places in the code
    private final Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;


    @PostConstruct
    public void aggregate() {
        Flux<Message<?>> output = aggregatorFunction.apply(eventsPublisher.asFlux());
        output.subscribe(this::aggregate);
    }

    public void aggregate(Message<?> aggregatedEventsMessage) {

        if (...) {
           //some code
        } else {
            throw new RuntimeException();
        }
    }
}

If the RuntimeException is thrown, the aggregation function does not work, and I get this message The [bean 'outputChannel'; defined in: 'class path resource [org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.class]'; from source: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration.outputChannel()'] doesn't have subscribers to accept messages at org.springframework.util.Assert.state(Assert.java:97)

Is there any way to subscribe to the flux generated by the aggregation function in a safe way?


Solution

  • That's correct. That's how Reactive Streams work: if an exception is thrown, the subscriber is cancelled and no new data can be send to that subscriber anymore.

    Consider to not throw that exception up to the stream.

    See more in docs: https://docs.spring.io/spring-cloud-stream/docs/4.0.0-SNAPSHOT/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling