Search code examples
apache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

Spring Cloud Stream generates value as string containing JSON instead of just JSON


In a stream processing application using Spring Cloud Stream I am taking an input stream (keyed on an integer) and calling selectKey on it to create a new topic with the same values, but with a different key (a string). The input topic has records in it in proper JSON format, e.g.:

"key": {
  "id": 1
},
"value": {
  "id": 1,
  "public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...

The problem is that the topic created by the stream processing application has the value as a string containing JSON rather than as proper JSON, i.e.:

"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"

The code is as follows:

@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
         stream.selectKey { _, value -> value.publicId }

What the function above does is consume the input stream, and generate an output stream (being sent to output). That output stream has the same values as the input stream, but simply a different key. (In this case the key comes from the value's publicId property.)

The application.yml is as follows:

spring.cloud.stream:
  bindings:
    input:
      destination: input-topic
    output:
      destination: output-output
  kafka:
    streams:
      binder:
        application-id: test-app-id-1
      bindings:
        input:
          consumer:
            keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
        output:
          producer:
            keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde

Is there something I'm missing? Is this actually a problem, or is it OK for the JSON to be stored as a string in the messages produced by Spring Cloud Stream?

Other things I've tried which haven't made a difference:

  • Using native decoding/encoding
  • Setting spring.cloud.stream.bindings.output.content-type to application/json
  • Using map instead of selectKey

Solution

  • It implies you are sending publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a" as a String instead of a POJO.

    If that's what you are sending, you should use a StringSerde not a JsonSerde.

    EDIT

    I just tested it with a Java app and it works as expected...

    @SpringBootApplication
    @EnableBinding(KafkaStreamsProcessor.class)
    public class So58538297Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So58538297Application.class, args);
        }
    
        @StreamListener(Processor.INPUT)
        @SendTo(Processor.OUTPUT)
        public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
            return stream.selectKey((key, value) -> value.getBar());
        }
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            ObjectMapper mapper = new ObjectMapper();
            return args -> {
                template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
            };
        }
    
        @KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
        public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
            System.out.println("out:" + in + ", key:" + key);
        }
    
        @KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
        public void in(String in) {
            System.out.println("in:" + in);
        }
    
        public static class Foo {
    
            private String bar;
    
            public Foo() {
                super();
            }
    
            public Foo(String bar) {
                this.bar = bar;
            }
    
            public String getBar() {
                return this.bar;
            }
    
            public void setBar(String bar) {
                this.bar = bar;
            }
    
        }
    
    }
    

    and

    spring.application.name=so58538297
    
    spring.kafka.consumer.auto-offset-reset=earliest
    

    and

    in:{"bar":"baz"}
    out:{"bar":"baz"}, key:baz