There aren't any documentation examples of adding or manipulating a header in the spring cloud streams documentation, only accessing the headers.
There are examples online that show usage of the ProcessorContext. However, using this results in inconsistent header application to messages.
This is the current implementation:
public class EventHeaderTransformer implements Transformer<String, RequestEvent, KeyValue<String, RequestEvent>>
{
private static final String EVENT_HEADER_NAME = "event";
ProcessorContext context;
public EventHeaderTransformer() { }
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, RequestEvent> transform(String key, RequestEvent value) {
context.headers().add(EVENT_HEADER_NAME, value.getEventName().getBytes());
return new KeyValue<>(key, value);
}
@Override
public void close() {
// nothing here
}
}
public Function<KStream<String, Request>, KStream<String, RequestEvent>> streamRequests() {
return input -> input
.transform(() -> unrelatedTransformer)
.filter(unrelatedFilter)
// The transformer in question
.transform(() -> eventHeaderTransformer);
// Debug output after this transformer show inconsistencies
}
streamRequests-in-0:
destination: queue.unmanaged.requests
group: streamRequests
consumer:
partitioned: true
concurrency: 3
streamRequests-out-0:
destination: queue.core.requests
For example, the code above results in the following message layout across 9 messages:
(p = partition) ([N] = offset)
Printing out debug messages shows unexpected results, where sometimes a header won't list as added, or headers may be empty, etc.
How does one within Spring Cloud Streams simply add or manipulate a header in a message passing through a transformer.
.transform(() -> eventHeaderTransformer);
Transformers are stateful; you must return a new instance each time; certainly with concurrency; with the newer org.apache.kafka.streams.processor.api.ContextualProcessor
(which replaces Transformer
in 3.3), this is enforced, regardless of concurrency.