Search code examples
apache-kafkaspring-cloudspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

How to send only custom Header using spring cloud stream


I have to set a custom header in the kafka message, my kafka cluster natively supports headers (1.x.x). I am currently using springCloudVersion=Finchley.RELEASE.

When I am setting property

default:
    producer:
      headerMode: none

None of the headers are coming in the output.

On the other hand, If I am setting

default:
    producer:
      headerMode: headers

Many headers including contentType and spring_json_header_types are coming in the header which greatly affects the throughput. Kafka being a language/framework agnostic message delivery mechanism, I feel, spring should provide a way to only include user-provided headers. Is there any work around to get only user-set headers to the kafka topic while suppressing all the spring cloud related headers.


Solution

  • I already answered this on GitHub. It's a waste of your time and ours to ask the same question in multiple places. As I said there, Stack Overflow is the preferred place to ask questions.

    Here's the answer again:

    Please use Stack Overflow to ask questions; GitHub issues are for reporting bugs or asking for new features.

    You can provide a custom header mapper in the binder configuration.

    spring.cloud.stream.kafka.binder.headerMapperBeanName

    The bean name of a KafkaHeaderMapper used for mapping spring-messaging headers to and from Kafka headers. Use this, for example, if you wish to customize the trusted packages in a DefaultKafkaHeaderMapper that uses JSON deserialization for the headers.

    Default: none.

    Implement your own KafkaHeaderMapper and add it as a @Bean and configure the binder to use it.

    EDIT

    The DefaultKafkaHeaderMapper has a property to enable converting byte[] headers to String.

    /**
     * Set the headers to not perform any conversion on (except {@code String} to
     * {@code byte[]} for outbound). Inbound headers that match will be mapped as
     * {@code byte[]} unless the corresponding boolean in the map value is true,
     * in which case it will be mapped as a String.
     * @param rawMappedHeaders the header names to not convert and
     * @since 2.2.5
     * @see #setCharset(Charset)
     * @see #setMapAllStringsOut(boolean)
     */
    public void setRawMappedHeaders(Map<String, Boolean> rawMappedHeaders) {