Search code examples
spring-bootspring-integrationspring-cloud-streamspring-rabbitspring-reactor

Add Custom Headers to Spring Cloud Stream (with Spring Reactor)


Being new to Spring Reactor, I am trying to stream the data using Spring cloud stream(using rabbitMQ). I need to add some custom headers before the message is sent to the queue.

My spring-cloud-stream's configuration is:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

Producer reference:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

Headers are being added to the Message successfully, but it's getting overridden somewhere and not getting propagated to the consumer.

Could someone please share their thoughts on how we can add custom headers to a Message using Spring Cloud Stream.

Thanks in advance!


Solution

  • Please upgrade to Hoxton.SR2 which will bring spring-cloud-stream 3.0.2.RELEASE. There were some updates but in short the Message you're producing and the header in it should be preserved.

    Side note: Also, due to added support for multiple in/out function arguments we had to update the binding name convention for functions. You can read more about it here, but what it means to you is that your configuration needs a quick update since input and output are no longer used by default so you should use names derived from function name

    spring:
      cloud:
        stream:
          bindings:
            processMessageaddHeaders-in-0:
              binder: rabbitInput
              destination: inputDestination
            processMessageaddHeaders-out-0:
              binder: rabbitOutput
              destination: outputDestination
          function:
            definition: processMessage|addHeaders
    

    . . . or you can map the derived binding names to something more descriptive (e.g., input, output etc) and use that name instead

    spring:
      cloud:
        stream:
          bindings:
            input:
              binder: rabbitInput
              destination: inputDestination
            output:
              binder: rabbitOutput
              destination: outputDestination
          function:
            definition: processMessage|addHeaders
            bindings: 
              processMessageaddHeaders-in-0: input  
              processMessageaddHeaders-out-0: output