Being new to Spring Reactor, I am trying to stream the data using Spring cloud stream(using rabbitMQ). I need to add some custom headers before the message is sent to the queue.
My spring-cloud-stream's configuration is:
spring:
cloud:
stream:
default:
producer:
errorChannelEnabled: true
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
binders:
rabbitInput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5672
host: localhost
rabbitOutput:
type: rabbit
environment:
spring:
rabbitmq:
port: 5670
host: localhost
Producer reference:
@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {
public static void main(String[] args) {
SpringApplication.run(MessageProcessor.class, args);
}
@Bean
Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
return data -> data.map(d -> match(d, students));
}
private String match(String message, List<String> students){
return Objects.isNull(message) || message.isBlank()
? message
: String.valueOf(matchStudentName(message, students));
}
private Optional<String> matchStudentName(String message, List<String> students){
return students.stream()
.filter(name -> name.equals(message)).findFirst();
}
@Bean
Function<Flux<String>, Flux<Message<String>>> addHeaders() {
return data-> data.map(d-> MessageBuilder
.withPayload( d )
.setHeader("a", 1)
.setHeader("b", "999")
.build());
}
}
Headers are being added to the Message successfully, but it's getting overridden somewhere and not getting propagated to the consumer.
Could someone please share their thoughts on how we can add custom headers to a Message using Spring Cloud Stream.
Thanks in advance!
Please upgrade to Hoxton.SR2 which will bring spring-cloud-stream 3.0.2.RELEASE. There were some updates but in short the Message you're producing and the header in it should be preserved.
Side note:
Also, due to added support for multiple in/out function arguments we had to update the binding name convention for functions. You can read more about it here, but what it means to you is that your configuration needs a quick update since input
and output
are no longer used by default so you should use names derived from function name
spring:
cloud:
stream:
bindings:
processMessageaddHeaders-in-0:
binder: rabbitInput
destination: inputDestination
processMessageaddHeaders-out-0:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
. . . or you can map the derived binding names to something more descriptive (e.g., input
, output
etc) and use that name instead
spring:
cloud:
stream:
bindings:
input:
binder: rabbitInput
destination: inputDestination
output:
binder: rabbitOutput
destination: outputDestination
function:
definition: processMessage|addHeaders
bindings:
processMessageaddHeaders-in-0: input
processMessageaddHeaders-out-0: output