Search code examples
spring-integrationspring-cloud-streamspring-kafka

How to catch errors from the Spring Integration error channel inside Spring Cloud Stream?


I'm trying to create a application-level error handler for failures during the processing inside SCS application using Kafka as a message broker. I know that SCS already provides the DLQ functionality, but in my case I want to wrap failed messages with a custom wrapper type (providing the failure context (source, cause etc.))

In https://github.com/qabbasi/Spring-Cloud-Stream-DLQ-Error-Handling you can see two approaches for this scenario: one is using SCS and the other one directly Spring Integration. (both are atm not working)

According to the current reference (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_producing_and_consuming_messages) SCS will allow to publish error messages received from the Spring Integration error channel, but unfortunately this not the case, at least for me. Although the application logs the following upon startup

o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 2 subscriber(s).


Solution

  • You shouldn't use @StreamListener("errorChannel") - that is consuming from a binder destination; to capture messages sent to the errorChannel use @ServiceActivator(inputChannel = "errorChannel").

    EDIT

    There were several problems with your app...

    1. The new error handling code was added in version 1.3
    2. The autoCommitOnError is a kafka binder property
    3. You needed an @EnableBinding(CustomDlqMessageChannel.class)
    4. You don't really need @EnableIntegration - boot does that for you

    See my commit here.

    and...

    $ kafka-console-producer --broker-list localhost:9092 --topic testIn
    >foo
    

    and...

    $ kafka-console-consumer --bootstrap-server localhost:9092 --topic customDlqTopic --from-beginning
    ?
    contentType>"application/x-java-object;type=com.example.demo.ErrorWrapper"