Search code examples
apache-kafkaapache-kafka-streams

How to make Serdes work with multi-step kafka streams


I am new to Kafka and I'm building a starter project using the Twitter API as a data source. I have create a Producer which can query the Twitter API and sends the data to my kafka topic with string serializer for both key and value. My Kafka Stream Application reads this data and does a word count, but also grouping by the date of the tweet. This part is done through a KTable called wordCounts to make use of its upsert functionality. The structure of this KTable is:

Key: {word: exampleWord, date: exampleDate}, Value: numberOfOccurences

I then attempt to restructure the data in the KTable stream to a flat structure so I can later send it to a database. You can see this in the wordCountsStructured KStream object. This restructures the data to look like the structure below. The value is initially a JsonObject but i convert it to a string to match the Serdes which i set.

Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}

However, when I try to send this to my second kafka topic, I get the error below.

A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: com.google.gson.JsonObject / value type: com.google.gson.JsonObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

I'm confused by this since the KStream I am sending to the topic is of type <String, String>. Does anyone know how I might fix this?

public class TwitterWordCounter {

private final JsonParser jsonParser = new JsonParser();

public Topology createTopology(){
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String, String> textLines = builder.stream("test-topic2");
    KTable<JsonObject, Long> wordCounts = textLines
            //parse each tweet as a tweet object
            .mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
            //map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
            .flatMapValues(TwitterWordCounter::tweetWordDateMapper)
            //update the key so it matches the word-date combination so we can do a groupBy and count instances
            .selectKey((key, wordDate) -> wordDate)
            .groupByKey()
            .count(Materialized.as("Counts"));

    /*
        In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
        so we have to:
         1. take the columns which include the dimensional data and put this into the value of the stream.
         2. lable the count with 'count' as the column name
     */
    KStream<String, String> wordCountsStructured = wordCounts.toStream()
            .map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));

    KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
            (key, value) -> System.out.println("key: " + key + "value:" + value)
    );

    wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));

    return builder.build();
}

public static void main(String[] args) {
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    TwitterWordCounter wordCountApp = new TwitterWordCounter();

    KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
    streams.start();

    // shutdown hook to correctly close the streams application
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
    try{

        List<String> words = Arrays.asList(tweet.tweetText.split("\\W+"));
        List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
        for(String word: words) {
            JsonObject tweetJson = new JsonObject();
            tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
            tweetJson.add("word", new JsonPrimitive(word));
            tweetsJson.add(tweetJson);
        }

        return tweetsJson;
    }
    catch (Exception e) {
        System.out.println(e);
        System.out.println(tweet.serialize().toString());
        return new ArrayList<JsonObject>();
    }

}

public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
    key.addProperty("count", countOfWord); //new JsonPrimitive(count));
    return key;
}

Solution

  • Because you are performing a key changing operation before the groupBy(), it will create a repartition topic and for that topic, it will rely on the default key, value serdes, which you have set to String Serde.

    You can modify the groupBy() call to groupBy(Grouped.with(StringSerde,JsonSerde) and this should help.