In a stream processing application using Spring Cloud Stream I am taking an input stream (keyed on an integer) and calling selectKey
on it to create a new topic with the same values, but with a different key (a string). The input topic has records in it in proper JSON format, e.g.:
"key": {
"id": 1
},
"value": {
"id": 1,
"public_id": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a", ...
The problem is that the topic created by the stream processing application has the value
as a string containing JSON rather than as proper JSON, i.e.:
"key": "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a",
"value": "{\"id\":1,\"publicId\":\"4273b60f-6fe6-40be-8602-d0b3ed2ecf2a\"}"
The code is as follows:
@StreamListener
@SendTo("output")
fun process(@Input("input") stream: KStream<Int, MyObj>): KStream<String, MyObj> =
stream.selectKey { _, value -> value.publicId }
What the function above does is consume the input stream, and generate an output stream (being sent to output
). That output stream has the same values as the input stream, but simply a different key. (In this case the key comes from the value's publicId
property.)
The application.yml
is as follows:
spring.cloud.stream:
bindings:
input:
destination: input-topic
output:
destination: output-output
kafka:
streams:
binder:
application-id: test-app-id-1
bindings:
input:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
output:
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
Is there something I'm missing? Is this actually a problem, or is it OK for the JSON to be stored as a string in the messages produced by Spring Cloud Stream?
Other things I've tried which haven't made a difference:
spring.cloud.stream.bindings.output.content-type
to application/json
map
instead of selectKey
It implies you are sending publicId: "4273b60f-6fe6-40be-8602-d0b3ed2ecf2a"
as a String instead of a POJO.
If that's what you are sending, you should use a StringSerde
not a JsonSerde
.
EDIT
I just tested it with a Java app and it works as expected...
@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class So58538297Application {
public static void main(String[] args) {
SpringApplication.run(So58538297Application.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public KStream<String, Foo> process(@Input(Processor.INPUT) KStream<String, Foo> stream) {
return stream.selectKey((key, value) -> value.getBar());
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
ObjectMapper mapper = new ObjectMapper();
return args -> {
template.send(Processor.INPUT, mapper.writeValueAsString(new Foo("baz")));
};
}
@KafkaListener(id = "outputGroup", topics = Processor.OUTPUT)
public void out(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
System.out.println("out:" + in + ", key:" + key);
}
@KafkaListener(id = "copyOfInput", topics = Processor.INPUT)
public void in(String in) {
System.out.println("in:" + in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
}
}
and
spring.application.name=so58538297
spring.kafka.consumer.auto-offset-reset=earliest
and
in:{"bar":"baz"}
out:{"bar":"baz"}, key:baz