Search code examples
javaspringkafka-consumer-apispring-kafkaspring-cloud-stream

Spring Cloud Stream (Kafka) parameterize specified error channel {destination}.{group}.errors


I am trying to see if the error channel I am passing to @ServiceActivator can be bounded/parameterized referring the value specified in YAML instead of hardcoding actual destination and consumer group in the code itself.

@ServiceActivator(
        // I do not want to hardcode destination and consumer group here
        inputChannel = "stream-test-topic.my-consumer-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // Getting exception objects
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("exception occurred", errorMessagePayload);

        // Get message body
        Message<?> originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("Message Body: {}", originalMessage.getPayload());
        } else {
            log.error("The message body is empty");
        }
    }

Solution

  • You can't do that with @ServiceActivator; use the Java DSL instead:

    @Value("${error.channel}")
    String errors;
    
    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows.from(this.errors)
                .handle(msg -> {
                    System.out.println(msg);
                })
                .get();
    }
    

    And set

    error:
      channel: stream-test-topic.my-consumer-group.errors