I wanted to disable the embedded header in a message, when using dynamically generated destination, similar to example from here (say output topics = dyntopic1,dyntopic2,...).
I've set properties like the below, but I'm still getting the header, any suggestions if I've missed something?
spring.cloud.stream.bindings.output.group=test-ogroup
spring.cloud.stream.bindings.output.binder=kafka
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.output.content-type=text/plain
Kafka = v0.10
spring-cloud-dependencies.version = Edgware.M1
Setting producer properties for dynamic destinations is not available with Spring Cloud Stream 1.3 or lower.
If you know the properties ahead of time, you can set them in properties...
spring.cloud.stream.bindings.dyntopic1.producer.headerMode=raw
The feature has been added to master and will be available in the 2.0 release.
EDIT
It can be done with Edgware as long as you don't mind using reflection to reset the flag. You have to replace the channel resolver bean.
I tested this with Edgware.SR1 - you really shouldn't be using M1 any more, that was a pre-release milestone.
I can't guarantee that this will work with newer versions because it is messing with framework internals.
@SpringBootApplication
@EnableBinding(Sink.class)
public class So48543143Application {
public static void main(String[] args) {
SpringApplication.run(So48543143Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel routeChannel) {
return args -> {
routeChannel.send(new GenericMessage<>("foo"));
};
}
@ServiceActivator(inputChannel = "routeChannel")
@Bean
public AbstractMappingMessageRouter router(MyBinderAwareChannelResolver resolver) {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new LiteralExpression("foo"));
router.setDefaultOutputChannelName("default");
router.setChannelResolver(resolver);
return router;
}
@Bean
public MyBinderAwareChannelResolver binderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
return new MyBinderAwareChannelResolver(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
public static class MyBinderAwareChannelResolver extends BinderAwareChannelResolver {
public MyBinderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
super(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
@Override
public MessageChannel resolveDestination(String channelName) {
MessageChannel channel = super.resolveDestination(channelName);
DirectFieldAccessor dfa = new DirectFieldAccessor(channel);
AbstractDispatcher dispatcher = (AbstractDispatcher) dfa.getPropertyValue("dispatcher");
dfa = new DirectFieldAccessor(dispatcher);
@SuppressWarnings("unchecked")
Set<MessageHandler> handlers = (Set<MessageHandler>) dfa.getPropertyValue("handlers");
// there should be exactly one handler
MessageHandler handler = handlers.iterator().next();
dfa = new DirectFieldAccessor(handler);
dfa.setPropertyValue("embedHeaders", false);
return channel;
}
}
}