Search code examples
apache-kafkaspring-kafka

Create a KStream out of a string passed


I have a string\

String input= "{ {"header" : "K"},
{"body" : "Sghd"}}"\

I sent this input to a kafka topic and now I want to consume it as a KStream<String, JsonNode> instead of a String ?\

I tried
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> inputStream = builder.stream(topic);\

but it is not working... Can anyone help me?\


Solution

  • You have to include the Serdes for both the classes that you are using. There is a Serdes present for String but an Custom Serdes need to be created for JsonNode. After that you can use this code.

    KStream<String, Record> inputStream = streamsBuilder
                .stream(topic, Consumed.with(Serdes.String(), CustomSerdes.JsonNode()));
    

    The code that you can use for the serdes is:

    public class CustomSerdes {
      private CustomSerdes() {}
    
      public static Serde<JsonNode> JsonNode() {
        JsonSerializer<JsonNode> serializer = new JsonSerializer<>();
        JsonDeserializer<JsonNode> deserializer = new JsonDeserializer<>(JsonNode.class);
        return Serdes.serdeFrom(serializer, deserializer);
    }