I have a string\
String input= "{ {"header" : "K"},
{"body" : "Sghd"}}"\
I sent this input to a kafka topic and now I want to consume it as a KStream<String, JsonNode> instead of a String ?\
I tried
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> inputStream = builder.stream(topic);\
but it is not working... Can anyone help me?\
You have to include the Serdes for both the classes that you are using. There is a Serdes present for String but an Custom Serdes need to be created for JsonNode. After that you can use this code.
KStream<String, Record> inputStream = streamsBuilder
.stream(topic, Consumed.with(Serdes.String(), CustomSerdes.JsonNode()));
The code that you can use for the serdes is:
public class CustomSerdes {
private CustomSerdes() {}
public static Serde<JsonNode> JsonNode() {
JsonSerializer<JsonNode> serializer = new JsonSerializer<>();
JsonDeserializer<JsonNode> deserializer = new JsonDeserializer<>(JsonNode.class);
return Serdes.serdeFrom(serializer, deserializer);
}