Search code examples
apache-kafkaspring-cloudapache-kafka-streamsspring-kafkaspring-cloud-stream

How to add a header in Spring Cloud Streams


There aren't any documentation examples of adding or manipulating a header in the spring cloud streams documentation, only accessing the headers.

There are examples online that show usage of the ProcessorContext. However, using this results in inconsistent header application to messages.

This is the current implementation:

public class EventHeaderTransformer implements Transformer<String, RequestEvent, KeyValue<String, RequestEvent>>
{
    private static final String EVENT_HEADER_NAME = "event";
    ProcessorContext context;

    public EventHeaderTransformer() { }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<String, RequestEvent> transform(String key, RequestEvent value) {
        context.headers().add(EVENT_HEADER_NAME, value.getEventName().getBytes());
        return new KeyValue<>(key, value);
    }

    @Override
    public void close() {
        // nothing here
    }

}

public Function<KStream<String, Request>, KStream<String, RequestEvent>> streamRequests() {
        return input -> input
            .transform(() -> unrelatedTransformer)
            .filter(unrelatedFilter)
            // The transformer in question
            .transform(() -> eventHeaderTransformer);
            // Debug output after this transformer show inconsistencies
    }
        streamRequests-in-0:
          destination: queue.unmanaged.requests
          group: streamRequests
          consumer:
            partitioned: true
            concurrency: 3
        streamRequests-out-0:
          destination: queue.core.requests

For example, the code above results in the following message layout across 9 messages:

(p = partition) ([N] = offset)

  • p0[0] = message without header
  • p1[0] = message without header
  • p2[0] = message with header
  • p0[0] = message without header
  • p1[0] = message without header
  • p2[0] = message without header
  • p0[0] = message without header
  • p1[0] = message without header
  • p2[0] = message with header

Printing out debug messages shows unexpected results, where sometimes a header won't list as added, or headers may be empty, etc.

How does one within Spring Cloud Streams simply add or manipulate a header in a message passing through a transformer.


Solution

  • .transform(() -> eventHeaderTransformer);

    Transformers are stateful; you must return a new instance each time; certainly with concurrency; with the newer org.apache.kafka.streams.processor.api.ContextualProcessor (which replaces Transformer in 3.3), this is enforced, regardless of concurrency.